You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ek...@apache.org on 2012/08/16 03:32:58 UTC

svn commit: r1373684 [33/35] - in /giraph/site: ./ apidocs/ apidocs/org/apache/giraph/ apidocs/org/apache/giraph/benchmark/ apidocs/org/apache/giraph/benchmark/class-use/ apidocs/org/apache/giraph/bsp/ apidocs/org/apache/giraph/bsp/class-use/ apidocs/o...

Modified: giraph/site/xref/org/apache/giraph/graph/BspServiceWorker.html
URL: http://svn.apache.org/viewvc/giraph/site/xref/org/apache/giraph/graph/BspServiceWorker.html?rev=1373684&r1=1373683&r2=1373684&view=diff
==============================================================================
--- giraph/site/xref/org/apache/giraph/graph/BspServiceWorker.html (original)
+++ giraph/site/xref/org/apache/giraph/graph/BspServiceWorker.html Thu Aug 16 01:32:41 2012
@@ -1222,373 +1222,411 @@
 <a name="1212" href="#1212">1212</a>     }
 <a name="1213" href="#1213">1213</a> 
 <a name="1214" href="#1214">1214</a>     getFs().createNewFile(validFilePath);
-<a name="1215" href="#1215">1215</a>   }
-<a name="1216" href="#1216">1216</a> 
-<a name="1217" href="#1217">1217</a>   @Override
-<a name="1218" href="#1218">1218</a>   <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">void</strong> loadCheckpoint(<strong class="jxr_keyword">long</strong> superstep) {
-<a name="1219" href="#1219">1219</a>     <strong class="jxr_keyword">if</strong> (getConfiguration().getBoolean(GiraphJob.USE_NETTY,
-<a name="1220" href="#1220">1220</a>         GiraphJob.USE_NETTY_DEFAULT)) {
-<a name="1221" href="#1221">1221</a>       <strong class="jxr_keyword">try</strong> {
-<a name="1222" href="#1222">1222</a>         <em class="jxr_comment">// clear old message stores</em>
-<a name="1223" href="#1223">1223</a>         getServerData().getIncomingMessageStore().clearAll();
-<a name="1224" href="#1224">1224</a>         getServerData().getCurrentMessageStore().clearAll();
-<a name="1225" href="#1225">1225</a>       } <strong class="jxr_keyword">catch</strong> (IOException e) {
-<a name="1226" href="#1226">1226</a>         <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> RuntimeException(
-<a name="1227" href="#1227">1227</a>             <span class="jxr_string">"loadCheckpoint: Failed to clear message stores "</span>, e);
-<a name="1228" href="#1228">1228</a>       }
-<a name="1229" href="#1229">1229</a>     }
-<a name="1230" href="#1230">1230</a> 
-<a name="1231" href="#1231">1231</a>     <em class="jxr_comment">// Algorithm:</em>
-<a name="1232" href="#1232">1232</a>     <em class="jxr_comment">// Examine all the partition owners and load the ones</em>
-<a name="1233" href="#1233">1233</a>     <em class="jxr_comment">// that match my hostname and id from the master designated checkpoint</em>
-<a name="1234" href="#1234">1234</a>     <em class="jxr_comment">// prefixes.</em>
-<a name="1235" href="#1235">1235</a>     <strong class="jxr_keyword">long</strong> startPos = 0;
-<a name="1236" href="#1236">1236</a>     <strong class="jxr_keyword">int</strong> loadedPartitions = 0;
-<a name="1237" href="#1237">1237</a>     <strong class="jxr_keyword">for</strong> (PartitionOwner partitionOwner :
-<a name="1238" href="#1238">1238</a>       workerGraphPartitioner.getPartitionOwners()) {
-<a name="1239" href="#1239">1239</a>       <strong class="jxr_keyword">if</strong> (partitionOwner.getWorkerInfo().equals(getWorkerInfo())) {
-<a name="1240" href="#1240">1240</a>         String metadataFile =
-<a name="1241" href="#1241">1241</a>             partitionOwner.getCheckpointFilesPrefix() +
-<a name="1242" href="#1242">1242</a>             CHECKPOINT_METADATA_POSTFIX;
-<a name="1243" href="#1243">1243</a>         String partitionsFile =
-<a name="1244" href="#1244">1244</a>             partitionOwner.getCheckpointFilesPrefix() +
-<a name="1245" href="#1245">1245</a>             CHECKPOINT_VERTICES_POSTFIX;
-<a name="1246" href="#1246">1246</a>         <strong class="jxr_keyword">try</strong> {
-<a name="1247" href="#1247">1247</a>           <strong class="jxr_keyword">int</strong> partitionId = -1;
-<a name="1248" href="#1248">1248</a>           DataInputStream metadataStream =
-<a name="1249" href="#1249">1249</a>               getFs().open(<strong class="jxr_keyword">new</strong> Path(metadataFile));
-<a name="1250" href="#1250">1250</a>           <strong class="jxr_keyword">int</strong> partitions = metadataStream.readInt();
-<a name="1251" href="#1251">1251</a>           <strong class="jxr_keyword">for</strong> (<strong class="jxr_keyword">int</strong> i = 0; i &lt; partitions; ++i) {
-<a name="1252" href="#1252">1252</a>             startPos = metadataStream.readLong();
-<a name="1253" href="#1253">1253</a>             partitionId = metadataStream.readInt();
-<a name="1254" href="#1254">1254</a>             <strong class="jxr_keyword">if</strong> (partitionId == partitionOwner.getPartitionId()) {
-<a name="1255" href="#1255">1255</a>               <strong class="jxr_keyword">break</strong>;
-<a name="1256" href="#1256">1256</a>             }
-<a name="1257" href="#1257">1257</a>           }
-<a name="1258" href="#1258">1258</a>           <strong class="jxr_keyword">if</strong> (partitionId != partitionOwner.getPartitionId()) {
-<a name="1259" href="#1259">1259</a>             <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
-<a name="1260" href="#1260">1260</a>                 <span class="jxr_string">"loadCheckpoint: "</span> + partitionOwner +
-<a name="1261" href="#1261">1261</a>                 <span class="jxr_string">" not found!"</span>);
-<a name="1262" href="#1262">1262</a>           }
-<a name="1263" href="#1263">1263</a>           metadataStream.close();
-<a name="1264" href="#1264">1264</a>           Partition&lt;I, V, E, M&gt; partition =
-<a name="1265" href="#1265">1265</a>               <strong class="jxr_keyword">new</strong> Partition&lt;I, V, E, M&gt;(
-<a name="1266" href="#1266">1266</a>                   getConfiguration(),
-<a name="1267" href="#1267">1267</a>                   partitionId);
-<a name="1268" href="#1268">1268</a>           DataInputStream partitionsStream =
-<a name="1269" href="#1269">1269</a>               getFs().open(<strong class="jxr_keyword">new</strong> Path(partitionsFile));
-<a name="1270" href="#1270">1270</a>           <strong class="jxr_keyword">if</strong> (partitionsStream.skip(startPos) != startPos) {
-<a name="1271" href="#1271">1271</a>             <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
-<a name="1272" href="#1272">1272</a>                 <span class="jxr_string">"loadCheckpoint: Failed to skip "</span> + startPos +
-<a name="1273" href="#1273">1273</a>                 <span class="jxr_string">" on "</span> + partitionsFile);
-<a name="1274" href="#1274">1274</a>           }
-<a name="1275" href="#1275">1275</a>           partition.readFields(partitionsStream);
-<a name="1276" href="#1276">1276</a>           <strong class="jxr_keyword">if</strong> (partitionsStream.readBoolean()) {
-<a name="1277" href="#1277">1277</a>             getServerData().getCurrentMessageStore().readFieldsForPartition(
-<a name="1278" href="#1278">1278</a>                 partitionsStream, partitionId);
-<a name="1279" href="#1279">1279</a>           }
-<a name="1280" href="#1280">1280</a>           partitionsStream.close();
-<a name="1281" href="#1281">1281</a>           <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
-<a name="1282" href="#1282">1282</a>             LOG.info(<span class="jxr_string">"loadCheckpoint: Loaded partition "</span> +
-<a name="1283" href="#1283">1283</a>                 partition);
-<a name="1284" href="#1284">1284</a>           }
-<a name="1285" href="#1285">1285</a>           <strong class="jxr_keyword">if</strong> (getPartitionMap().put(partitionId, partition) != <strong class="jxr_keyword">null</strong>) {
-<a name="1286" href="#1286">1286</a>             <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
-<a name="1287" href="#1287">1287</a>                 <span class="jxr_string">"loadCheckpoint: Already has partition owner "</span> +
-<a name="1288" href="#1288">1288</a>                     partitionOwner);
-<a name="1289" href="#1289">1289</a>           }
-<a name="1290" href="#1290">1290</a>           ++loadedPartitions;
-<a name="1291" href="#1291">1291</a>         } <strong class="jxr_keyword">catch</strong> (IOException e) {
-<a name="1292" href="#1292">1292</a>           <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> RuntimeException(
-<a name="1293" href="#1293">1293</a>               <span class="jxr_string">"loadCheckpoing: Failed to get partition owner "</span> +
-<a name="1294" href="#1294">1294</a>                   partitionOwner, e);
-<a name="1295" href="#1295">1295</a>         }
-<a name="1296" href="#1296">1296</a>       }
-<a name="1297" href="#1297">1297</a>     }
-<a name="1298" href="#1298">1298</a>     <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
-<a name="1299" href="#1299">1299</a>       LOG.info(<span class="jxr_string">"loadCheckpoint: Loaded "</span> + loadedPartitions +
-<a name="1300" href="#1300">1300</a>           <span class="jxr_string">" partitions of out "</span> +
-<a name="1301" href="#1301">1301</a>           workerGraphPartitioner.getPartitionOwners().size() +
-<a name="1302" href="#1302">1302</a>           <span class="jxr_string">" total."</span>);
-<a name="1303" href="#1303">1303</a>     }
-<a name="1304" href="#1304">1304</a>     <em class="jxr_comment">// Communication service needs to setup the connections prior to</em>
-<a name="1305" href="#1305">1305</a>     <em class="jxr_comment">// processing vertices</em>
-<a name="1306" href="#1306">1306</a>     commService.setup();
-<a name="1307" href="#1307">1307</a>   }
-<a name="1308" href="#1308">1308</a> 
-<a name="1309" href="#1309">1309</a>   <em class="jxr_javadoccomment">/**</em>
-<a name="1310" href="#1310">1310</a> <em class="jxr_javadoccomment">   * Send the worker partitions to their destination workers</em>
-<a name="1311" href="#1311">1311</a> <em class="jxr_javadoccomment">   *</em>
-<a name="1312" href="#1312">1312</a> <em class="jxr_javadoccomment">   * @param workerPartitionMap Map of worker info to the partitions stored</em>
-<a name="1313" href="#1313">1313</a> <em class="jxr_javadoccomment">   *        on this worker to be sent</em>
-<a name="1314" href="#1314">1314</a> <em class="jxr_javadoccomment">   */</em>
-<a name="1315" href="#1315">1315</a>   <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">void</strong> sendWorkerPartitions(
-<a name="1316" href="#1316">1316</a>       Map&lt;WorkerInfo, List&lt;Integer&gt;&gt; workerPartitionMap) {
-<a name="1317" href="#1317">1317</a>     List&lt;Entry&lt;WorkerInfo, List&lt;Integer&gt;&gt;&gt; randomEntryList =
-<a name="1318" href="#1318">1318</a>         <strong class="jxr_keyword">new</strong> ArrayList&lt;Entry&lt;WorkerInfo, List&lt;Integer&gt;&gt;&gt;(
-<a name="1319" href="#1319">1319</a>             workerPartitionMap.entrySet());
-<a name="1320" href="#1320">1320</a>     Collections.shuffle(randomEntryList);
-<a name="1321" href="#1321">1321</a>     <strong class="jxr_keyword">for</strong> (Entry&lt;WorkerInfo, List&lt;Integer&gt;&gt; workerPartitionList :
-<a name="1322" href="#1322">1322</a>       randomEntryList) {
-<a name="1323" href="#1323">1323</a>       <strong class="jxr_keyword">for</strong> (Integer partitionId : workerPartitionList.getValue()) {
-<a name="1324" href="#1324">1324</a>         Partition&lt;I, V, E, M&gt; partition =
-<a name="1325" href="#1325">1325</a>             getPartitionMap().get(partitionId);
-<a name="1326" href="#1326">1326</a>         <strong class="jxr_keyword">if</strong> (partition == <strong class="jxr_keyword">null</strong>) {
-<a name="1327" href="#1327">1327</a>           <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
-<a name="1328" href="#1328">1328</a>               <span class="jxr_string">"sendWorkerPartitions: Couldn't find partition "</span> +
-<a name="1329" href="#1329">1329</a>                   partitionId + <span class="jxr_string">" to send to "</span> +
-<a name="1330" href="#1330">1330</a>                   workerPartitionList.getKey());
-<a name="1331" href="#1331">1331</a>         }
-<a name="1332" href="#1332">1332</a>         <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
-<a name="1333" href="#1333">1333</a>           LOG.info(<span class="jxr_string">"sendWorkerPartitions: Sending worker "</span> +
-<a name="1334" href="#1334">1334</a>               workerPartitionList.getKey() + <span class="jxr_string">" partition "</span> +
-<a name="1335" href="#1335">1335</a>               partitionId);
-<a name="1336" href="#1336">1336</a>         }
-<a name="1337" href="#1337">1337</a>         getGraphMapper().getGraphState().getWorkerCommunications().
-<a name="1338" href="#1338">1338</a>             sendPartitionRequest(workerPartitionList.getKey(),
-<a name="1339" href="#1339">1339</a>                 partition);
-<a name="1340" href="#1340">1340</a>         getPartitionMap().remove(partitionId);
-<a name="1341" href="#1341">1341</a>       }
-<a name="1342" href="#1342">1342</a>     }
-<a name="1343" href="#1343">1343</a> 
-<a name="1344" href="#1344">1344</a>     <strong class="jxr_keyword">try</strong> {
-<a name="1345" href="#1345">1345</a>       getGraphMapper().getGraphState().getWorkerCommunications().flush();
-<a name="1346" href="#1346">1346</a>     } <strong class="jxr_keyword">catch</strong> (IOException e) {
-<a name="1347" href="#1347">1347</a>       <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(<span class="jxr_string">"sendWorkerPartitions: Flush failed"</span>, e);
-<a name="1348" href="#1348">1348</a>     }
-<a name="1349" href="#1349">1349</a>     String myPartitionExchangeDonePath =
-<a name="1350" href="#1350">1350</a>         getPartitionExchangeWorkerPath(
-<a name="1351" href="#1351">1351</a>             getApplicationAttempt(), getSuperstep(), getWorkerInfo());
-<a name="1352" href="#1352">1352</a>     <strong class="jxr_keyword">try</strong> {
-<a name="1353" href="#1353">1353</a>       getZkExt().createExt(myPartitionExchangeDonePath,
-<a name="1354" href="#1354">1354</a>           <strong class="jxr_keyword">null</strong>,
-<a name="1355" href="#1355">1355</a>           Ids.OPEN_ACL_UNSAFE,
-<a name="1356" href="#1356">1356</a>           CreateMode.PERSISTENT,
-<a name="1357" href="#1357">1357</a>           <strong class="jxr_keyword">true</strong>);
-<a name="1358" href="#1358">1358</a>     } <strong class="jxr_keyword">catch</strong> (KeeperException e) {
-<a name="1359" href="#1359">1359</a>       <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
-<a name="1360" href="#1360">1360</a>           <span class="jxr_string">"sendWorkerPartitions: KeeperException to create "</span> +
-<a name="1361" href="#1361">1361</a>               myPartitionExchangeDonePath, e);
-<a name="1362" href="#1362">1362</a>     } <strong class="jxr_keyword">catch</strong> (InterruptedException e) {
-<a name="1363" href="#1363">1363</a>       <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
-<a name="1364" href="#1364">1364</a>           <span class="jxr_string">"sendWorkerPartitions: InterruptedException to create "</span> +
-<a name="1365" href="#1365">1365</a>               myPartitionExchangeDonePath, e);
-<a name="1366" href="#1366">1366</a>     }
-<a name="1367" href="#1367">1367</a>     <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
-<a name="1368" href="#1368">1368</a>       LOG.info(<span class="jxr_string">"sendWorkerPartitions: Done sending all my partitions."</span>);
-<a name="1369" href="#1369">1369</a>     }
-<a name="1370" href="#1370">1370</a>   }
-<a name="1371" href="#1371">1371</a> 
-<a name="1372" href="#1372">1372</a>   @Override
-<a name="1373" href="#1373">1373</a>   <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">final</strong> <strong class="jxr_keyword">void</strong> exchangeVertexPartitions(
-<a name="1374" href="#1374">1374</a>       Collection&lt;? <strong class="jxr_keyword">extends</strong> PartitionOwner&gt; masterSetPartitionOwners) {
-<a name="1375" href="#1375">1375</a>     <em class="jxr_comment">// 1. Fix the addresses of the partition ids if they have changed.</em>
-<a name="1376" href="#1376">1376</a>     <em class="jxr_comment">// 2. Send all the partitions to their destination workers in a random</em>
-<a name="1377" href="#1377">1377</a>     <em class="jxr_comment">//    fashion.</em>
-<a name="1378" href="#1378">1378</a>     <em class="jxr_comment">// 3. Notify completion with a ZooKeeper stamp</em>
-<a name="1379" href="#1379">1379</a>     <em class="jxr_comment">// 4. Wait for all my dependencies to be done (if any)</em>
-<a name="1380" href="#1380">1380</a>     <em class="jxr_comment">// 5. Add the partitions to myself.</em>
-<a name="1381" href="#1381">1381</a>     <a href="../../../../org/apache/giraph/graph/partition/PartitionExchange.html">PartitionExchange</a> partitionExchange =
-<a name="1382" href="#1382">1382</a>         workerGraphPartitioner.updatePartitionOwners(
-<a name="1383" href="#1383">1383</a>             getWorkerInfo(), masterSetPartitionOwners, getPartitionMap());
-<a name="1384" href="#1384">1384</a>     commService.fixPartitionIdToSocketAddrMap();
-<a name="1385" href="#1385">1385</a> 
-<a name="1386" href="#1386">1386</a>     Map&lt;WorkerInfo, List&lt;Integer&gt;&gt; sendWorkerPartitionMap =
-<a name="1387" href="#1387">1387</a>         partitionExchange.getSendWorkerPartitionMap();
-<a name="1388" href="#1388">1388</a>     <strong class="jxr_keyword">if</strong> (!workerPartitionMap.isEmpty()) {
-<a name="1389" href="#1389">1389</a>       sendWorkerPartitions(sendWorkerPartitionMap);
-<a name="1390" href="#1390">1390</a>     }
-<a name="1391" href="#1391">1391</a> 
-<a name="1392" href="#1392">1392</a>     Set&lt;WorkerInfo&gt; myDependencyWorkerSet =
-<a name="1393" href="#1393">1393</a>         partitionExchange.getMyDependencyWorkerSet();
-<a name="1394" href="#1394">1394</a>     Set&lt;String&gt; workerIdSet = <strong class="jxr_keyword">new</strong> HashSet&lt;String&gt;();
-<a name="1395" href="#1395">1395</a>     <strong class="jxr_keyword">for</strong> (WorkerInfo tmpWorkerInfo : myDependencyWorkerSet) {
-<a name="1396" href="#1396">1396</a>       <strong class="jxr_keyword">if</strong> (!workerIdSet.add(tmpWorkerInfo.getHostnameId())) {
-<a name="1397" href="#1397">1397</a>         <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
-<a name="1398" href="#1398">1398</a>             <span class="jxr_string">"exchangeVertexPartitions: Duplicate entry "</span> + tmpWorkerInfo);
-<a name="1399" href="#1399">1399</a>       }
-<a name="1400" href="#1400">1400</a>     }
-<a name="1401" href="#1401">1401</a>     <strong class="jxr_keyword">if</strong> (myDependencyWorkerSet.isEmpty() &amp;&amp; workerPartitionMap.isEmpty()) {
-<a name="1402" href="#1402">1402</a>       <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
-<a name="1403" href="#1403">1403</a>         LOG.info(<span class="jxr_string">"exchangeVertexPartitions: Nothing to exchange, "</span> +
-<a name="1404" href="#1404">1404</a>             <span class="jxr_string">"exiting early"</span>);
-<a name="1405" href="#1405">1405</a>       }
-<a name="1406" href="#1406">1406</a>       <strong class="jxr_keyword">return</strong>;
+<a name="1215" href="#1215">1215</a> 
+<a name="1216" href="#1216">1216</a>     <em class="jxr_comment">// Notify master that checkpoint is stored</em>
+<a name="1217" href="#1217">1217</a>     String workerWroteCheckpoint =
+<a name="1218" href="#1218">1218</a>         getWorkerWroteCheckpointPath(getApplicationAttempt(),
+<a name="1219" href="#1219">1219</a>             getSuperstep()) + <span class="jxr_string">"/"</span> + getHostnamePartitionId();
+<a name="1220" href="#1220">1220</a>     <strong class="jxr_keyword">try</strong> {
+<a name="1221" href="#1221">1221</a>       getZkExt().createExt(workerWroteCheckpoint,
+<a name="1222" href="#1222">1222</a>           <strong class="jxr_keyword">new</strong> byte[0],
+<a name="1223" href="#1223">1223</a>           Ids.OPEN_ACL_UNSAFE,
+<a name="1224" href="#1224">1224</a>           CreateMode.PERSISTENT,
+<a name="1225" href="#1225">1225</a>           <strong class="jxr_keyword">true</strong>);
+<a name="1226" href="#1226">1226</a>     } <strong class="jxr_keyword">catch</strong> (KeeperException.NodeExistsException e) {
+<a name="1227" href="#1227">1227</a>       LOG.warn(<span class="jxr_string">"finishSuperstep: wrote checkpoint worker path "</span> +
+<a name="1228" href="#1228">1228</a>           workerWroteCheckpoint + <span class="jxr_string">" already exists!"</span>);
+<a name="1229" href="#1229">1229</a>     } <strong class="jxr_keyword">catch</strong> (KeeperException e) {
+<a name="1230" href="#1230">1230</a>       <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(<span class="jxr_string">"Creating "</span> + workerWroteCheckpoint +
+<a name="1231" href="#1231">1231</a>           <span class="jxr_string">" failed with KeeperException"</span>, e);
+<a name="1232" href="#1232">1232</a>     } <strong class="jxr_keyword">catch</strong> (InterruptedException e) {
+<a name="1233" href="#1233">1233</a>       <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(<span class="jxr_string">"Creating "</span> + workerWroteCheckpoint +
+<a name="1234" href="#1234">1234</a>           <span class="jxr_string">" failed with InterruptedException"</span>, e);
+<a name="1235" href="#1235">1235</a>     }
+<a name="1236" href="#1236">1236</a>   }
+<a name="1237" href="#1237">1237</a> 
+<a name="1238" href="#1238">1238</a>   @Override
+<a name="1239" href="#1239">1239</a>   <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">void</strong> loadCheckpoint(<strong class="jxr_keyword">long</strong> superstep) {
+<a name="1240" href="#1240">1240</a>     <strong class="jxr_keyword">if</strong> (getConfiguration().getBoolean(GiraphJob.USE_NETTY,
+<a name="1241" href="#1241">1241</a>         GiraphJob.USE_NETTY_DEFAULT)) {
+<a name="1242" href="#1242">1242</a>       <strong class="jxr_keyword">try</strong> {
+<a name="1243" href="#1243">1243</a>         <em class="jxr_comment">// clear old message stores</em>
+<a name="1244" href="#1244">1244</a>         getServerData().getIncomingMessageStore().clearAll();
+<a name="1245" href="#1245">1245</a>         getServerData().getCurrentMessageStore().clearAll();
+<a name="1246" href="#1246">1246</a>       } <strong class="jxr_keyword">catch</strong> (IOException e) {
+<a name="1247" href="#1247">1247</a>         <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> RuntimeException(
+<a name="1248" href="#1248">1248</a>             <span class="jxr_string">"loadCheckpoint: Failed to clear message stores "</span>, e);
+<a name="1249" href="#1249">1249</a>       }
+<a name="1250" href="#1250">1250</a>     }
+<a name="1251" href="#1251">1251</a> 
+<a name="1252" href="#1252">1252</a>     <em class="jxr_comment">// Algorithm:</em>
+<a name="1253" href="#1253">1253</a>     <em class="jxr_comment">// Examine all the partition owners and load the ones</em>
+<a name="1254" href="#1254">1254</a>     <em class="jxr_comment">// that match my hostname and id from the master designated checkpoint</em>
+<a name="1255" href="#1255">1255</a>     <em class="jxr_comment">// prefixes.</em>
+<a name="1256" href="#1256">1256</a>     <strong class="jxr_keyword">long</strong> startPos = 0;
+<a name="1257" href="#1257">1257</a>     <strong class="jxr_keyword">int</strong> loadedPartitions = 0;
+<a name="1258" href="#1258">1258</a>     <strong class="jxr_keyword">for</strong> (PartitionOwner partitionOwner :
+<a name="1259" href="#1259">1259</a>       workerGraphPartitioner.getPartitionOwners()) {
+<a name="1260" href="#1260">1260</a>       <strong class="jxr_keyword">if</strong> (partitionOwner.getWorkerInfo().equals(getWorkerInfo())) {
+<a name="1261" href="#1261">1261</a>         String metadataFile =
+<a name="1262" href="#1262">1262</a>             partitionOwner.getCheckpointFilesPrefix() +
+<a name="1263" href="#1263">1263</a>             CHECKPOINT_METADATA_POSTFIX;
+<a name="1264" href="#1264">1264</a>         String partitionsFile =
+<a name="1265" href="#1265">1265</a>             partitionOwner.getCheckpointFilesPrefix() +
+<a name="1266" href="#1266">1266</a>             CHECKPOINT_VERTICES_POSTFIX;
+<a name="1267" href="#1267">1267</a>         <strong class="jxr_keyword">try</strong> {
+<a name="1268" href="#1268">1268</a>           <strong class="jxr_keyword">int</strong> partitionId = -1;
+<a name="1269" href="#1269">1269</a>           DataInputStream metadataStream =
+<a name="1270" href="#1270">1270</a>               getFs().open(<strong class="jxr_keyword">new</strong> Path(metadataFile));
+<a name="1271" href="#1271">1271</a>           <strong class="jxr_keyword">int</strong> partitions = metadataStream.readInt();
+<a name="1272" href="#1272">1272</a>           <strong class="jxr_keyword">for</strong> (<strong class="jxr_keyword">int</strong> i = 0; i &lt; partitions; ++i) {
+<a name="1273" href="#1273">1273</a>             startPos = metadataStream.readLong();
+<a name="1274" href="#1274">1274</a>             partitionId = metadataStream.readInt();
+<a name="1275" href="#1275">1275</a>             <strong class="jxr_keyword">if</strong> (partitionId == partitionOwner.getPartitionId()) {
+<a name="1276" href="#1276">1276</a>               <strong class="jxr_keyword">break</strong>;
+<a name="1277" href="#1277">1277</a>             }
+<a name="1278" href="#1278">1278</a>           }
+<a name="1279" href="#1279">1279</a>           <strong class="jxr_keyword">if</strong> (partitionId != partitionOwner.getPartitionId()) {
+<a name="1280" href="#1280">1280</a>             <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
+<a name="1281" href="#1281">1281</a>                 <span class="jxr_string">"loadCheckpoint: "</span> + partitionOwner +
+<a name="1282" href="#1282">1282</a>                 <span class="jxr_string">" not found!"</span>);
+<a name="1283" href="#1283">1283</a>           }
+<a name="1284" href="#1284">1284</a>           metadataStream.close();
+<a name="1285" href="#1285">1285</a>           Partition&lt;I, V, E, M&gt; partition =
+<a name="1286" href="#1286">1286</a>               <strong class="jxr_keyword">new</strong> Partition&lt;I, V, E, M&gt;(
+<a name="1287" href="#1287">1287</a>                   getConfiguration(),
+<a name="1288" href="#1288">1288</a>                   partitionId);
+<a name="1289" href="#1289">1289</a>           DataInputStream partitionsStream =
+<a name="1290" href="#1290">1290</a>               getFs().open(<strong class="jxr_keyword">new</strong> Path(partitionsFile));
+<a name="1291" href="#1291">1291</a>           <strong class="jxr_keyword">if</strong> (partitionsStream.skip(startPos) != startPos) {
+<a name="1292" href="#1292">1292</a>             <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
+<a name="1293" href="#1293">1293</a>                 <span class="jxr_string">"loadCheckpoint: Failed to skip "</span> + startPos +
+<a name="1294" href="#1294">1294</a>                 <span class="jxr_string">" on "</span> + partitionsFile);
+<a name="1295" href="#1295">1295</a>           }
+<a name="1296" href="#1296">1296</a>           partition.readFields(partitionsStream);
+<a name="1297" href="#1297">1297</a>           <strong class="jxr_keyword">if</strong> (partitionsStream.readBoolean()) {
+<a name="1298" href="#1298">1298</a>             getServerData().getCurrentMessageStore().readFieldsForPartition(
+<a name="1299" href="#1299">1299</a>                 partitionsStream, partitionId);
+<a name="1300" href="#1300">1300</a>           }
+<a name="1301" href="#1301">1301</a>           partitionsStream.close();
+<a name="1302" href="#1302">1302</a>           <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
+<a name="1303" href="#1303">1303</a>             LOG.info(<span class="jxr_string">"loadCheckpoint: Loaded partition "</span> +
+<a name="1304" href="#1304">1304</a>                 partition);
+<a name="1305" href="#1305">1305</a>           }
+<a name="1306" href="#1306">1306</a>           <strong class="jxr_keyword">if</strong> (getPartitionMap().put(partitionId, partition) != <strong class="jxr_keyword">null</strong>) {
+<a name="1307" href="#1307">1307</a>             <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
+<a name="1308" href="#1308">1308</a>                 <span class="jxr_string">"loadCheckpoint: Already has partition owner "</span> +
+<a name="1309" href="#1309">1309</a>                     partitionOwner);
+<a name="1310" href="#1310">1310</a>           }
+<a name="1311" href="#1311">1311</a>           ++loadedPartitions;
+<a name="1312" href="#1312">1312</a>         } <strong class="jxr_keyword">catch</strong> (IOException e) {
+<a name="1313" href="#1313">1313</a>           <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> RuntimeException(
+<a name="1314" href="#1314">1314</a>               <span class="jxr_string">"loadCheckpoing: Failed to get partition owner "</span> +
+<a name="1315" href="#1315">1315</a>                   partitionOwner, e);
+<a name="1316" href="#1316">1316</a>         }
+<a name="1317" href="#1317">1317</a>       }
+<a name="1318" href="#1318">1318</a>     }
+<a name="1319" href="#1319">1319</a>     <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
+<a name="1320" href="#1320">1320</a>       LOG.info(<span class="jxr_string">"loadCheckpoint: Loaded "</span> + loadedPartitions +
+<a name="1321" href="#1321">1321</a>           <span class="jxr_string">" partitions of out "</span> +
+<a name="1322" href="#1322">1322</a>           workerGraphPartitioner.getPartitionOwners().size() +
+<a name="1323" href="#1323">1323</a>           <span class="jxr_string">" total."</span>);
+<a name="1324" href="#1324">1324</a>     }
+<a name="1325" href="#1325">1325</a> 
+<a name="1326" href="#1326">1326</a>     <em class="jxr_comment">// Load global statistics</em>
+<a name="1327" href="#1327">1327</a>     String finalizedCheckpointPath =
+<a name="1328" href="#1328">1328</a>         getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
+<a name="1329" href="#1329">1329</a>     <strong class="jxr_keyword">try</strong> {
+<a name="1330" href="#1330">1330</a>       DataInputStream finalizedStream =
+<a name="1331" href="#1331">1331</a>           getFs().open(<strong class="jxr_keyword">new</strong> Path(finalizedCheckpointPath));
+<a name="1332" href="#1332">1332</a>       <a href="../../../../org/apache/giraph/graph/GlobalStats.html">GlobalStats</a> globalStats = <strong class="jxr_keyword">new</strong> <a href="../../../../org/apache/giraph/graph/GlobalStats.html">GlobalStats</a>();
+<a name="1333" href="#1333">1333</a>       globalStats.readFields(finalizedStream);
+<a name="1334" href="#1334">1334</a>       getGraphMapper().getGraphState().
+<a name="1335" href="#1335">1335</a>           setTotalNumEdges(globalStats.getEdgeCount()).
+<a name="1336" href="#1336">1336</a>           setTotalNumVertices(globalStats.getVertexCount());
+<a name="1337" href="#1337">1337</a>     } <strong class="jxr_keyword">catch</strong> (IOException e) {
+<a name="1338" href="#1338">1338</a>       <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
+<a name="1339" href="#1339">1339</a>           <span class="jxr_string">"loadCheckpoint: Failed to load global statistics"</span>, e);
+<a name="1340" href="#1340">1340</a>     }
+<a name="1341" href="#1341">1341</a> 
+<a name="1342" href="#1342">1342</a>     <em class="jxr_comment">// Communication service needs to setup the connections prior to</em>
+<a name="1343" href="#1343">1343</a>     <em class="jxr_comment">// processing vertices</em>
+<a name="1344" href="#1344">1344</a>     commService.setup();
+<a name="1345" href="#1345">1345</a>   }
+<a name="1346" href="#1346">1346</a> 
+<a name="1347" href="#1347">1347</a>   <em class="jxr_javadoccomment">/**</em>
+<a name="1348" href="#1348">1348</a> <em class="jxr_javadoccomment">   * Send the worker partitions to their destination workers</em>
+<a name="1349" href="#1349">1349</a> <em class="jxr_javadoccomment">   *</em>
+<a name="1350" href="#1350">1350</a> <em class="jxr_javadoccomment">   * @param workerPartitionMap Map of worker info to the partitions stored</em>
+<a name="1351" href="#1351">1351</a> <em class="jxr_javadoccomment">   *        on this worker to be sent</em>
+<a name="1352" href="#1352">1352</a> <em class="jxr_javadoccomment">   */</em>
+<a name="1353" href="#1353">1353</a>   <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">void</strong> sendWorkerPartitions(
+<a name="1354" href="#1354">1354</a>       Map&lt;WorkerInfo, List&lt;Integer&gt;&gt; workerPartitionMap) {
+<a name="1355" href="#1355">1355</a>     List&lt;Entry&lt;WorkerInfo, List&lt;Integer&gt;&gt;&gt; randomEntryList =
+<a name="1356" href="#1356">1356</a>         <strong class="jxr_keyword">new</strong> ArrayList&lt;Entry&lt;WorkerInfo, List&lt;Integer&gt;&gt;&gt;(
+<a name="1357" href="#1357">1357</a>             workerPartitionMap.entrySet());
+<a name="1358" href="#1358">1358</a>     Collections.shuffle(randomEntryList);
+<a name="1359" href="#1359">1359</a>     <strong class="jxr_keyword">for</strong> (Entry&lt;WorkerInfo, List&lt;Integer&gt;&gt; workerPartitionList :
+<a name="1360" href="#1360">1360</a>       randomEntryList) {
+<a name="1361" href="#1361">1361</a>       <strong class="jxr_keyword">for</strong> (Integer partitionId : workerPartitionList.getValue()) {
+<a name="1362" href="#1362">1362</a>         Partition&lt;I, V, E, M&gt; partition =
+<a name="1363" href="#1363">1363</a>             getPartitionMap().get(partitionId);
+<a name="1364" href="#1364">1364</a>         <strong class="jxr_keyword">if</strong> (partition == <strong class="jxr_keyword">null</strong>) {
+<a name="1365" href="#1365">1365</a>           <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
+<a name="1366" href="#1366">1366</a>               <span class="jxr_string">"sendWorkerPartitions: Couldn't find partition "</span> +
+<a name="1367" href="#1367">1367</a>                   partitionId + <span class="jxr_string">" to send to "</span> +
+<a name="1368" href="#1368">1368</a>                   workerPartitionList.getKey());
+<a name="1369" href="#1369">1369</a>         }
+<a name="1370" href="#1370">1370</a>         <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
+<a name="1371" href="#1371">1371</a>           LOG.info(<span class="jxr_string">"sendWorkerPartitions: Sending worker "</span> +
+<a name="1372" href="#1372">1372</a>               workerPartitionList.getKey() + <span class="jxr_string">" partition "</span> +
+<a name="1373" href="#1373">1373</a>               partitionId);
+<a name="1374" href="#1374">1374</a>         }
+<a name="1375" href="#1375">1375</a>         getGraphMapper().getGraphState().getWorkerCommunications().
+<a name="1376" href="#1376">1376</a>             sendPartitionRequest(workerPartitionList.getKey(),
+<a name="1377" href="#1377">1377</a>                 partition);
+<a name="1378" href="#1378">1378</a>         getPartitionMap().remove(partitionId);
+<a name="1379" href="#1379">1379</a>       }
+<a name="1380" href="#1380">1380</a>     }
+<a name="1381" href="#1381">1381</a> 
+<a name="1382" href="#1382">1382</a>     <strong class="jxr_keyword">try</strong> {
+<a name="1383" href="#1383">1383</a>       getGraphMapper().getGraphState().getWorkerCommunications().flush();
+<a name="1384" href="#1384">1384</a>     } <strong class="jxr_keyword">catch</strong> (IOException e) {
+<a name="1385" href="#1385">1385</a>       <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(<span class="jxr_string">"sendWorkerPartitions: Flush failed"</span>, e);
+<a name="1386" href="#1386">1386</a>     }
+<a name="1387" href="#1387">1387</a>     String myPartitionExchangeDonePath =
+<a name="1388" href="#1388">1388</a>         getPartitionExchangeWorkerPath(
+<a name="1389" href="#1389">1389</a>             getApplicationAttempt(), getSuperstep(), getWorkerInfo());
+<a name="1390" href="#1390">1390</a>     <strong class="jxr_keyword">try</strong> {
+<a name="1391" href="#1391">1391</a>       getZkExt().createExt(myPartitionExchangeDonePath,
+<a name="1392" href="#1392">1392</a>           <strong class="jxr_keyword">null</strong>,
+<a name="1393" href="#1393">1393</a>           Ids.OPEN_ACL_UNSAFE,
+<a name="1394" href="#1394">1394</a>           CreateMode.PERSISTENT,
+<a name="1395" href="#1395">1395</a>           <strong class="jxr_keyword">true</strong>);
+<a name="1396" href="#1396">1396</a>     } <strong class="jxr_keyword">catch</strong> (KeeperException e) {
+<a name="1397" href="#1397">1397</a>       <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
+<a name="1398" href="#1398">1398</a>           <span class="jxr_string">"sendWorkerPartitions: KeeperException to create "</span> +
+<a name="1399" href="#1399">1399</a>               myPartitionExchangeDonePath, e);
+<a name="1400" href="#1400">1400</a>     } <strong class="jxr_keyword">catch</strong> (InterruptedException e) {
+<a name="1401" href="#1401">1401</a>       <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
+<a name="1402" href="#1402">1402</a>           <span class="jxr_string">"sendWorkerPartitions: InterruptedException to create "</span> +
+<a name="1403" href="#1403">1403</a>               myPartitionExchangeDonePath, e);
+<a name="1404" href="#1404">1404</a>     }
+<a name="1405" href="#1405">1405</a>     <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
+<a name="1406" href="#1406">1406</a>       LOG.info(<span class="jxr_string">"sendWorkerPartitions: Done sending all my partitions."</span>);
 <a name="1407" href="#1407">1407</a>     }
-<a name="1408" href="#1408">1408</a> 
-<a name="1409" href="#1409">1409</a>     String vertexExchangePath =
-<a name="1410" href="#1410">1410</a>         getPartitionExchangePath(getApplicationAttempt(), getSuperstep());
-<a name="1411" href="#1411">1411</a>     List&lt;String&gt; workerDoneList;
-<a name="1412" href="#1412">1412</a>     <strong class="jxr_keyword">try</strong> {
-<a name="1413" href="#1413">1413</a>       <strong class="jxr_keyword">while</strong> (<strong class="jxr_keyword">true</strong>) {
-<a name="1414" href="#1414">1414</a>         workerDoneList = getZkExt().getChildrenExt(
-<a name="1415" href="#1415">1415</a>             vertexExchangePath, <strong class="jxr_keyword">true</strong>, false, false);
-<a name="1416" href="#1416">1416</a>         workerIdSet.removeAll(workerDoneList);
-<a name="1417" href="#1417">1417</a>         <strong class="jxr_keyword">if</strong> (workerIdSet.isEmpty()) {
-<a name="1418" href="#1418">1418</a>           <strong class="jxr_keyword">break</strong>;
-<a name="1419" href="#1419">1419</a>         }
-<a name="1420" href="#1420">1420</a>         <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
-<a name="1421" href="#1421">1421</a>           LOG.info(<span class="jxr_string">"exchangeVertexPartitions: Waiting for workers "</span> +
-<a name="1422" href="#1422">1422</a>               workerIdSet);
-<a name="1423" href="#1423">1423</a>         }
-<a name="1424" href="#1424">1424</a>         getPartitionExchangeChildrenChangedEvent().waitForever();
-<a name="1425" href="#1425">1425</a>         getPartitionExchangeChildrenChangedEvent().reset();
-<a name="1426" href="#1426">1426</a>       }
-<a name="1427" href="#1427">1427</a>     } <strong class="jxr_keyword">catch</strong> (KeeperException e) {
-<a name="1428" href="#1428">1428</a>       <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> RuntimeException(e);
-<a name="1429" href="#1429">1429</a>     } <strong class="jxr_keyword">catch</strong> (InterruptedException e) {
-<a name="1430" href="#1430">1430</a>       <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> RuntimeException(e);
-<a name="1431" href="#1431">1431</a>     }
-<a name="1432" href="#1432">1432</a> 
-<a name="1433" href="#1433">1433</a>     <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
-<a name="1434" href="#1434">1434</a>       LOG.info(<span class="jxr_string">"exchangeVertexPartitions: Done with exchange."</span>);
-<a name="1435" href="#1435">1435</a>     }
-<a name="1436" href="#1436">1436</a> 
-<a name="1437" href="#1437">1437</a>     <em class="jxr_comment">// Add the partitions sent earlier</em>
-<a name="1438" href="#1438">1438</a>     movePartitionsToWorker(commService);
-<a name="1439" href="#1439">1439</a>   }
-<a name="1440" href="#1440">1440</a> 
-<a name="1441" href="#1441">1441</a>   <em class="jxr_javadoccomment">/**</em>
-<a name="1442" href="#1442">1442</a> <em class="jxr_javadoccomment">   * Partitions that are exchanged need to be moved from the communication</em>
-<a name="1443" href="#1443">1443</a> <em class="jxr_javadoccomment">   * service to the worker.</em>
-<a name="1444" href="#1444">1444</a> <em class="jxr_javadoccomment">   *</em>
-<a name="1445" href="#1445">1445</a> <em class="jxr_javadoccomment">   * @param commService Communication service where the partitions are</em>
-<a name="1446" href="#1446">1446</a> <em class="jxr_javadoccomment">   *        temporarily stored.</em>
-<a name="1447" href="#1447">1447</a> <em class="jxr_javadoccomment">   */</em>
-<a name="1448" href="#1448">1448</a>   <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">void</strong> movePartitionsToWorker(
-<a name="1449" href="#1449">1449</a>       WorkerServer&lt;I, V, E, M&gt; commService) {
-<a name="1450" href="#1450">1450</a>     Map&lt;Integer, Collection&lt;Vertex&lt;I, V, E, M&gt;&gt;&gt; inPartitionVertexMap =
-<a name="1451" href="#1451">1451</a>         commService.getInPartitionVertexMap();
-<a name="1452" href="#1452">1452</a>     <strong class="jxr_keyword">synchronized</strong> (inPartitionVertexMap) {
-<a name="1453" href="#1453">1453</a>       <strong class="jxr_keyword">for</strong> (Entry&lt;Integer, Collection&lt;Vertex&lt;I, V, E, M&gt;&gt;&gt; entry :
-<a name="1454" href="#1454">1454</a>         inPartitionVertexMap.entrySet()) {
-<a name="1455" href="#1455">1455</a>         <strong class="jxr_keyword">if</strong> (getPartitionMap().containsKey(entry.getKey())) {
-<a name="1456" href="#1456">1456</a>           <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
-<a name="1457" href="#1457">1457</a>               <span class="jxr_string">"moveVerticesToWorker: Already has partition "</span> +
-<a name="1458" href="#1458">1458</a>                   getPartitionMap().get(entry.getKey()) +
-<a name="1459" href="#1459">1459</a>                   <span class="jxr_string">", cannot receive vertex list of size "</span> +
-<a name="1460" href="#1460">1460</a>                   entry.getValue().size());
+<a name="1408" href="#1408">1408</a>   }
+<a name="1409" href="#1409">1409</a> 
+<a name="1410" href="#1410">1410</a>   @Override
+<a name="1411" href="#1411">1411</a>   <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">final</strong> <strong class="jxr_keyword">void</strong> exchangeVertexPartitions(
+<a name="1412" href="#1412">1412</a>       Collection&lt;? <strong class="jxr_keyword">extends</strong> PartitionOwner&gt; masterSetPartitionOwners) {
+<a name="1413" href="#1413">1413</a>     <em class="jxr_comment">// 1. Fix the addresses of the partition ids if they have changed.</em>
+<a name="1414" href="#1414">1414</a>     <em class="jxr_comment">// 2. Send all the partitions to their destination workers in a random</em>
+<a name="1415" href="#1415">1415</a>     <em class="jxr_comment">//    fashion.</em>
+<a name="1416" href="#1416">1416</a>     <em class="jxr_comment">// 3. Notify completion with a ZooKeeper stamp</em>
+<a name="1417" href="#1417">1417</a>     <em class="jxr_comment">// 4. Wait for all my dependencies to be done (if any)</em>
+<a name="1418" href="#1418">1418</a>     <em class="jxr_comment">// 5. Add the partitions to myself.</em>
+<a name="1419" href="#1419">1419</a>     <a href="../../../../org/apache/giraph/graph/partition/PartitionExchange.html">PartitionExchange</a> partitionExchange =
+<a name="1420" href="#1420">1420</a>         workerGraphPartitioner.updatePartitionOwners(
+<a name="1421" href="#1421">1421</a>             getWorkerInfo(), masterSetPartitionOwners, getPartitionMap());
+<a name="1422" href="#1422">1422</a>     commService.fixPartitionIdToSocketAddrMap();
+<a name="1423" href="#1423">1423</a> 
+<a name="1424" href="#1424">1424</a>     Map&lt;WorkerInfo, List&lt;Integer&gt;&gt; sendWorkerPartitionMap =
+<a name="1425" href="#1425">1425</a>         partitionExchange.getSendWorkerPartitionMap();
+<a name="1426" href="#1426">1426</a>     <strong class="jxr_keyword">if</strong> (!workerPartitionMap.isEmpty()) {
+<a name="1427" href="#1427">1427</a>       sendWorkerPartitions(sendWorkerPartitionMap);
+<a name="1428" href="#1428">1428</a>     }
+<a name="1429" href="#1429">1429</a> 
+<a name="1430" href="#1430">1430</a>     Set&lt;WorkerInfo&gt; myDependencyWorkerSet =
+<a name="1431" href="#1431">1431</a>         partitionExchange.getMyDependencyWorkerSet();
+<a name="1432" href="#1432">1432</a>     Set&lt;String&gt; workerIdSet = <strong class="jxr_keyword">new</strong> HashSet&lt;String&gt;();
+<a name="1433" href="#1433">1433</a>     <strong class="jxr_keyword">for</strong> (WorkerInfo tmpWorkerInfo : myDependencyWorkerSet) {
+<a name="1434" href="#1434">1434</a>       <strong class="jxr_keyword">if</strong> (!workerIdSet.add(tmpWorkerInfo.getHostnameId())) {
+<a name="1435" href="#1435">1435</a>         <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
+<a name="1436" href="#1436">1436</a>             <span class="jxr_string">"exchangeVertexPartitions: Duplicate entry "</span> + tmpWorkerInfo);
+<a name="1437" href="#1437">1437</a>       }
+<a name="1438" href="#1438">1438</a>     }
+<a name="1439" href="#1439">1439</a>     <strong class="jxr_keyword">if</strong> (myDependencyWorkerSet.isEmpty() &amp;&amp; workerPartitionMap.isEmpty()) {
+<a name="1440" href="#1440">1440</a>       <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
+<a name="1441" href="#1441">1441</a>         LOG.info(<span class="jxr_string">"exchangeVertexPartitions: Nothing to exchange, "</span> +
+<a name="1442" href="#1442">1442</a>             <span class="jxr_string">"exiting early"</span>);
+<a name="1443" href="#1443">1443</a>       }
+<a name="1444" href="#1444">1444</a>       <strong class="jxr_keyword">return</strong>;
+<a name="1445" href="#1445">1445</a>     }
+<a name="1446" href="#1446">1446</a> 
+<a name="1447" href="#1447">1447</a>     String vertexExchangePath =
+<a name="1448" href="#1448">1448</a>         getPartitionExchangePath(getApplicationAttempt(), getSuperstep());
+<a name="1449" href="#1449">1449</a>     List&lt;String&gt; workerDoneList;
+<a name="1450" href="#1450">1450</a>     <strong class="jxr_keyword">try</strong> {
+<a name="1451" href="#1451">1451</a>       <strong class="jxr_keyword">while</strong> (<strong class="jxr_keyword">true</strong>) {
+<a name="1452" href="#1452">1452</a>         workerDoneList = getZkExt().getChildrenExt(
+<a name="1453" href="#1453">1453</a>             vertexExchangePath, <strong class="jxr_keyword">true</strong>, false, false);
+<a name="1454" href="#1454">1454</a>         workerIdSet.removeAll(workerDoneList);
+<a name="1455" href="#1455">1455</a>         <strong class="jxr_keyword">if</strong> (workerIdSet.isEmpty()) {
+<a name="1456" href="#1456">1456</a>           <strong class="jxr_keyword">break</strong>;
+<a name="1457" href="#1457">1457</a>         }
+<a name="1458" href="#1458">1458</a>         <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
+<a name="1459" href="#1459">1459</a>           LOG.info(<span class="jxr_string">"exchangeVertexPartitions: Waiting for workers "</span> +
+<a name="1460" href="#1460">1460</a>               workerIdSet);
 <a name="1461" href="#1461">1461</a>         }
-<a name="1462" href="#1462">1462</a> 
-<a name="1463" href="#1463">1463</a>         Partition&lt;I, V, E, M&gt; tmpPartition =
-<a name="1464" href="#1464">1464</a>             <strong class="jxr_keyword">new</strong> Partition&lt;I, V, E, M&gt;(getConfiguration(),
-<a name="1465" href="#1465">1465</a>                 entry.getKey());
-<a name="1466" href="#1466">1466</a>         <strong class="jxr_keyword">synchronized</strong> (entry.getValue()) {
-<a name="1467" href="#1467">1467</a>           <strong class="jxr_keyword">for</strong> (Vertex&lt;I, V, E, M&gt; vertex : entry.getValue()) {
-<a name="1468" href="#1468">1468</a>             <strong class="jxr_keyword">if</strong> (tmpPartition.putVertex(vertex) != <strong class="jxr_keyword">null</strong>) {
-<a name="1469" href="#1469">1469</a>               <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
-<a name="1470" href="#1470">1470</a>                   <span class="jxr_string">"moveVerticesToWorker: Vertex "</span> + vertex +
-<a name="1471" href="#1471">1471</a>                   <span class="jxr_string">" already exists!"</span>);
-<a name="1472" href="#1472">1472</a>             }
-<a name="1473" href="#1473">1473</a>           }
-<a name="1474" href="#1474">1474</a>           <strong class="jxr_keyword">if</strong> (LOG.isDebugEnabled()) {
-<a name="1475" href="#1475">1475</a>             LOG.debug(<span class="jxr_string">"movePartitionsToWorker: Adding "</span> +
-<a name="1476" href="#1476">1476</a>                 entry.getValue().size() +
-<a name="1477" href="#1477">1477</a>                 <span class="jxr_string">" vertices for partition id "</span> + entry.getKey());
-<a name="1478" href="#1478">1478</a>           }
-<a name="1479" href="#1479">1479</a>           getPartitionMap().put(tmpPartition.getId(),
-<a name="1480" href="#1480">1480</a>               tmpPartition);
-<a name="1481" href="#1481">1481</a>           entry.getValue().clear();
-<a name="1482" href="#1482">1482</a>         }
-<a name="1483" href="#1483">1483</a>       }
-<a name="1484" href="#1484">1484</a>       inPartitionVertexMap.clear();
-<a name="1485" href="#1485">1485</a>     }
-<a name="1486" href="#1486">1486</a>   }
-<a name="1487" href="#1487">1487</a> 
-<a name="1488" href="#1488">1488</a>   <em class="jxr_javadoccomment">/**</em>
-<a name="1489" href="#1489">1489</a> <em class="jxr_javadoccomment">   * Get event when the state of a partition exchange has changed.</em>
-<a name="1490" href="#1490">1490</a> <em class="jxr_javadoccomment">   *</em>
-<a name="1491" href="#1491">1491</a> <em class="jxr_javadoccomment">   * @return Event to check.</em>
-<a name="1492" href="#1492">1492</a> <em class="jxr_javadoccomment">   */</em>
-<a name="1493" href="#1493">1493</a>   <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">final</strong> <a href="../../../../org/apache/giraph/zk/BspEvent.html">BspEvent</a> getPartitionExchangeChildrenChangedEvent() {
-<a name="1494" href="#1494">1494</a>     <strong class="jxr_keyword">return</strong> partitionExchangeChildrenChanged;
-<a name="1495" href="#1495">1495</a>   }
-<a name="1496" href="#1496">1496</a> 
-<a name="1497" href="#1497">1497</a>   @Override
-<a name="1498" href="#1498">1498</a>   <strong class="jxr_keyword">protected</strong> <strong class="jxr_keyword">boolean</strong> processEvent(WatchedEvent event) {
-<a name="1499" href="#1499">1499</a>     <strong class="jxr_keyword">boolean</strong> foundEvent = false;
-<a name="1500" href="#1500">1500</a>     <strong class="jxr_keyword">if</strong> (event.getPath().startsWith(masterJobStatePath) &amp;&amp;
-<a name="1501" href="#1501">1501</a>         (event.getType() == EventType.NodeChildrenChanged)) {
-<a name="1502" href="#1502">1502</a>       <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
-<a name="1503" href="#1503">1503</a>         LOG.info(<span class="jxr_string">"processEvent: Job state changed, checking "</span> +
-<a name="1504" href="#1504">1504</a>             <span class="jxr_string">"to see if it needs to restart"</span>);
-<a name="1505" href="#1505">1505</a>       }
-<a name="1506" href="#1506">1506</a>       JSONObject jsonObj = getJobState();
-<a name="1507" href="#1507">1507</a>       <strong class="jxr_keyword">try</strong> {
-<a name="1508" href="#1508">1508</a>         <strong class="jxr_keyword">if</strong> ((ApplicationState.valueOf(jsonObj.getString(JSONOBJ_STATE_KEY)) ==
-<a name="1509" href="#1509">1509</a>             ApplicationState.START_SUPERSTEP) &amp;&amp;
-<a name="1510" href="#1510">1510</a>             jsonObj.getLong(JSONOBJ_APPLICATION_ATTEMPT_KEY) !=
-<a name="1511" href="#1511">1511</a>             getApplicationAttempt()) {
-<a name="1512" href="#1512">1512</a>           LOG.fatal(<span class="jxr_string">"processEvent: Worker will restart "</span> +
-<a name="1513" href="#1513">1513</a>               <span class="jxr_string">"from command - "</span> + jsonObj.toString());
-<a name="1514" href="#1514">1514</a>           System.exit(-1);
-<a name="1515" href="#1515">1515</a>         }
-<a name="1516" href="#1516">1516</a>       } <strong class="jxr_keyword">catch</strong> (JSONException e) {
-<a name="1517" href="#1517">1517</a>         <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> RuntimeException(
-<a name="1518" href="#1518">1518</a>             <span class="jxr_string">"processEvent: Couldn't properly get job state from "</span> +
-<a name="1519" href="#1519">1519</a>                 jsonObj.toString());
-<a name="1520" href="#1520">1520</a>       }
-<a name="1521" href="#1521">1521</a>       foundEvent = <strong class="jxr_keyword">true</strong>;
-<a name="1522" href="#1522">1522</a>     } <strong class="jxr_keyword">else</strong> <strong class="jxr_keyword">if</strong> (event.getPath().contains(PARTITION_EXCHANGE_DIR) &amp;&amp;
-<a name="1523" href="#1523">1523</a>         event.getType() == EventType.NodeChildrenChanged) {
-<a name="1524" href="#1524">1524</a>       <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
-<a name="1525" href="#1525">1525</a>         LOG.info(<span class="jxr_string">"processEvent : partitionExchangeChildrenChanged "</span> +
-<a name="1526" href="#1526">1526</a>             <span class="jxr_string">"(at least one worker is done sending partitions)"</span>);
-<a name="1527" href="#1527">1527</a>       }
-<a name="1528" href="#1528">1528</a>       partitionExchangeChildrenChanged.signal();
-<a name="1529" href="#1529">1529</a>       foundEvent = <strong class="jxr_keyword">true</strong>;
-<a name="1530" href="#1530">1530</a>     }
-<a name="1531" href="#1531">1531</a> 
-<a name="1532" href="#1532">1532</a>     <strong class="jxr_keyword">return</strong> foundEvent;
+<a name="1462" href="#1462">1462</a>         getPartitionExchangeChildrenChangedEvent().waitForever();
+<a name="1463" href="#1463">1463</a>         getPartitionExchangeChildrenChangedEvent().reset();
+<a name="1464" href="#1464">1464</a>       }
+<a name="1465" href="#1465">1465</a>     } <strong class="jxr_keyword">catch</strong> (KeeperException e) {
+<a name="1466" href="#1466">1466</a>       <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> RuntimeException(e);
+<a name="1467" href="#1467">1467</a>     } <strong class="jxr_keyword">catch</strong> (InterruptedException e) {
+<a name="1468" href="#1468">1468</a>       <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> RuntimeException(e);
+<a name="1469" href="#1469">1469</a>     }
+<a name="1470" href="#1470">1470</a> 
+<a name="1471" href="#1471">1471</a>     <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
+<a name="1472" href="#1472">1472</a>       LOG.info(<span class="jxr_string">"exchangeVertexPartitions: Done with exchange."</span>);
+<a name="1473" href="#1473">1473</a>     }
+<a name="1474" href="#1474">1474</a> 
+<a name="1475" href="#1475">1475</a>     <em class="jxr_comment">// Add the partitions sent earlier</em>
+<a name="1476" href="#1476">1476</a>     movePartitionsToWorker(commService);
+<a name="1477" href="#1477">1477</a>   }
+<a name="1478" href="#1478">1478</a> 
+<a name="1479" href="#1479">1479</a>   <em class="jxr_javadoccomment">/**</em>
+<a name="1480" href="#1480">1480</a> <em class="jxr_javadoccomment">   * Partitions that are exchanged need to be moved from the communication</em>
+<a name="1481" href="#1481">1481</a> <em class="jxr_javadoccomment">   * service to the worker.</em>
+<a name="1482" href="#1482">1482</a> <em class="jxr_javadoccomment">   *</em>
+<a name="1483" href="#1483">1483</a> <em class="jxr_javadoccomment">   * @param commService Communication service where the partitions are</em>
+<a name="1484" href="#1484">1484</a> <em class="jxr_javadoccomment">   *        temporarily stored.</em>
+<a name="1485" href="#1485">1485</a> <em class="jxr_javadoccomment">   */</em>
+<a name="1486" href="#1486">1486</a>   <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">void</strong> movePartitionsToWorker(
+<a name="1487" href="#1487">1487</a>       WorkerServer&lt;I, V, E, M&gt; commService) {
+<a name="1488" href="#1488">1488</a>     Map&lt;Integer, Collection&lt;Vertex&lt;I, V, E, M&gt;&gt;&gt; inPartitionVertexMap =
+<a name="1489" href="#1489">1489</a>         commService.getInPartitionVertexMap();
+<a name="1490" href="#1490">1490</a>     <strong class="jxr_keyword">synchronized</strong> (inPartitionVertexMap) {
+<a name="1491" href="#1491">1491</a>       <strong class="jxr_keyword">for</strong> (Entry&lt;Integer, Collection&lt;Vertex&lt;I, V, E, M&gt;&gt;&gt; entry :
+<a name="1492" href="#1492">1492</a>         inPartitionVertexMap.entrySet()) {
+<a name="1493" href="#1493">1493</a>         <strong class="jxr_keyword">if</strong> (getPartitionMap().containsKey(entry.getKey())) {
+<a name="1494" href="#1494">1494</a>           <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
+<a name="1495" href="#1495">1495</a>               <span class="jxr_string">"moveVerticesToWorker: Already has partition "</span> +
+<a name="1496" href="#1496">1496</a>                   getPartitionMap().get(entry.getKey()) +
+<a name="1497" href="#1497">1497</a>                   <span class="jxr_string">", cannot receive vertex list of size "</span> +
+<a name="1498" href="#1498">1498</a>                   entry.getValue().size());
+<a name="1499" href="#1499">1499</a>         }
+<a name="1500" href="#1500">1500</a> 
+<a name="1501" href="#1501">1501</a>         Partition&lt;I, V, E, M&gt; tmpPartition =
+<a name="1502" href="#1502">1502</a>             <strong class="jxr_keyword">new</strong> Partition&lt;I, V, E, M&gt;(getConfiguration(),
+<a name="1503" href="#1503">1503</a>                 entry.getKey());
+<a name="1504" href="#1504">1504</a>         <strong class="jxr_keyword">synchronized</strong> (entry.getValue()) {
+<a name="1505" href="#1505">1505</a>           <strong class="jxr_keyword">for</strong> (Vertex&lt;I, V, E, M&gt; vertex : entry.getValue()) {
+<a name="1506" href="#1506">1506</a>             <strong class="jxr_keyword">if</strong> (tmpPartition.putVertex(vertex) != <strong class="jxr_keyword">null</strong>) {
+<a name="1507" href="#1507">1507</a>               <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
+<a name="1508" href="#1508">1508</a>                   <span class="jxr_string">"moveVerticesToWorker: Vertex "</span> + vertex +
+<a name="1509" href="#1509">1509</a>                   <span class="jxr_string">" already exists!"</span>);
+<a name="1510" href="#1510">1510</a>             }
+<a name="1511" href="#1511">1511</a>           }
+<a name="1512" href="#1512">1512</a>           <strong class="jxr_keyword">if</strong> (LOG.isDebugEnabled()) {
+<a name="1513" href="#1513">1513</a>             LOG.debug(<span class="jxr_string">"movePartitionsToWorker: Adding "</span> +
+<a name="1514" href="#1514">1514</a>                 entry.getValue().size() +
+<a name="1515" href="#1515">1515</a>                 <span class="jxr_string">" vertices for partition id "</span> + entry.getKey());
+<a name="1516" href="#1516">1516</a>           }
+<a name="1517" href="#1517">1517</a>           getPartitionMap().put(tmpPartition.getId(),
+<a name="1518" href="#1518">1518</a>               tmpPartition);
+<a name="1519" href="#1519">1519</a>           entry.getValue().clear();
+<a name="1520" href="#1520">1520</a>         }
+<a name="1521" href="#1521">1521</a>       }
+<a name="1522" href="#1522">1522</a>       inPartitionVertexMap.clear();
+<a name="1523" href="#1523">1523</a>     }
+<a name="1524" href="#1524">1524</a>   }
+<a name="1525" href="#1525">1525</a> 
+<a name="1526" href="#1526">1526</a>   <em class="jxr_javadoccomment">/**</em>
+<a name="1527" href="#1527">1527</a> <em class="jxr_javadoccomment">   * Get event when the state of a partition exchange has changed.</em>
+<a name="1528" href="#1528">1528</a> <em class="jxr_javadoccomment">   *</em>
+<a name="1529" href="#1529">1529</a> <em class="jxr_javadoccomment">   * @return Event to check.</em>
+<a name="1530" href="#1530">1530</a> <em class="jxr_javadoccomment">   */</em>
+<a name="1531" href="#1531">1531</a>   <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">final</strong> <a href="../../../../org/apache/giraph/zk/BspEvent.html">BspEvent</a> getPartitionExchangeChildrenChangedEvent() {
+<a name="1532" href="#1532">1532</a>     <strong class="jxr_keyword">return</strong> partitionExchangeChildrenChanged;
 <a name="1533" href="#1533">1533</a>   }
 <a name="1534" href="#1534">1534</a> 
 <a name="1535" href="#1535">1535</a>   @Override
-<a name="1536" href="#1536">1536</a>   <strong class="jxr_keyword">public</strong> <a href="../../../../org/apache/giraph/graph/WorkerInfo.html">WorkerInfo</a> getWorkerInfo() {
-<a name="1537" href="#1537">1537</a>     <strong class="jxr_keyword">return</strong> workerInfo;
-<a name="1538" href="#1538">1538</a>   }
-<a name="1539" href="#1539">1539</a> 
-<a name="1540" href="#1540">1540</a>   @Override
-<a name="1541" href="#1541">1541</a>   <strong class="jxr_keyword">public</strong> Map&lt;Integer, Partition&lt;I, V, E, M&gt;&gt; getPartitionMap() {
-<a name="1542" href="#1542">1542</a>     <strong class="jxr_keyword">return</strong> workerPartitionMap;
-<a name="1543" href="#1543">1543</a>   }
-<a name="1544" href="#1544">1544</a> 
-<a name="1545" href="#1545">1545</a>   @Override
-<a name="1546" href="#1546">1546</a>   <strong class="jxr_keyword">public</strong> Collection&lt;? <strong class="jxr_keyword">extends</strong> PartitionOwner&gt; getPartitionOwners() {
-<a name="1547" href="#1547">1547</a>     <strong class="jxr_keyword">return</strong> workerGraphPartitioner.getPartitionOwners();
-<a name="1548" href="#1548">1548</a>   }
-<a name="1549" href="#1549">1549</a> 
-<a name="1550" href="#1550">1550</a>   @Override
-<a name="1551" href="#1551">1551</a>   <strong class="jxr_keyword">public</strong> <a href="../../../../org/apache/giraph/graph/partition/PartitionOwner.html">PartitionOwner</a> getVertexPartitionOwner(I vertexId) {
-<a name="1552" href="#1552">1552</a>     <strong class="jxr_keyword">return</strong> workerGraphPartitioner.getPartitionOwner(vertexId);
-<a name="1553" href="#1553">1553</a>   }
-<a name="1554" href="#1554">1554</a> 
-<a name="1555" href="#1555">1555</a>   <em class="jxr_javadoccomment">/**</em>
-<a name="1556" href="#1556">1556</a> <em class="jxr_javadoccomment">   * Get the partition for a vertex index.</em>
-<a name="1557" href="#1557">1557</a> <em class="jxr_javadoccomment">   *</em>
-<a name="1558" href="#1558">1558</a> <em class="jxr_javadoccomment">   * @param vertexId Vertex index to search for the partition.</em>
-<a name="1559" href="#1559">1559</a> <em class="jxr_javadoccomment">   * @return Partition that owns this vertex.</em>
-<a name="1560" href="#1560">1560</a> <em class="jxr_javadoccomment">   */</em>
-<a name="1561" href="#1561">1561</a>   <strong class="jxr_keyword">public</strong> Partition&lt;I, V, E, M&gt; getPartition(I vertexId) {
-<a name="1562" href="#1562">1562</a>     <a href="../../../../org/apache/giraph/graph/partition/PartitionOwner.html">PartitionOwner</a> partitionOwner = getVertexPartitionOwner(vertexId);
-<a name="1563" href="#1563">1563</a>     <strong class="jxr_keyword">return</strong> workerPartitionMap.get(partitionOwner.getPartitionId());
-<a name="1564" href="#1564">1564</a>   }
-<a name="1565" href="#1565">1565</a> 
-<a name="1566" href="#1566">1566</a>   @Override
-<a name="1567" href="#1567">1567</a>   <strong class="jxr_keyword">public</strong> Vertex&lt;I, V, E, M&gt; getVertex(I vertexId) {
-<a name="1568" href="#1568">1568</a>     <a href="../../../../org/apache/giraph/graph/partition/PartitionOwner.html">PartitionOwner</a> partitionOwner = getVertexPartitionOwner(vertexId);
-<a name="1569" href="#1569">1569</a>     <strong class="jxr_keyword">if</strong> (workerPartitionMap.containsKey(partitionOwner.getPartitionId())) {
-<a name="1570" href="#1570">1570</a>       <strong class="jxr_keyword">return</strong> workerPartitionMap.get(
-<a name="1571" href="#1571">1571</a>           partitionOwner.getPartitionId()).getVertex(vertexId);
-<a name="1572" href="#1572">1572</a>     } <strong class="jxr_keyword">else</strong> {
-<a name="1573" href="#1573">1573</a>       <strong class="jxr_keyword">return</strong> <strong class="jxr_keyword">null</strong>;
-<a name="1574" href="#1574">1574</a>     }
-<a name="1575" href="#1575">1575</a>   }
-<a name="1576" href="#1576">1576</a> 
-<a name="1577" href="#1577">1577</a>   @Override
-<a name="1578" href="#1578">1578</a>   <strong class="jxr_keyword">public</strong> ServerData&lt;I, V, E, M&gt; getServerData() {
-<a name="1579" href="#1579">1579</a>     <strong class="jxr_keyword">return</strong> commService.getServerData();
-<a name="1580" href="#1580">1580</a>   }
-<a name="1581" href="#1581">1581</a> }
+<a name="1536" href="#1536">1536</a>   <strong class="jxr_keyword">protected</strong> <strong class="jxr_keyword">boolean</strong> processEvent(WatchedEvent event) {
+<a name="1537" href="#1537">1537</a>     <strong class="jxr_keyword">boolean</strong> foundEvent = false;
+<a name="1538" href="#1538">1538</a>     <strong class="jxr_keyword">if</strong> (event.getPath().startsWith(masterJobStatePath) &amp;&amp;
+<a name="1539" href="#1539">1539</a>         (event.getType() == EventType.NodeChildrenChanged)) {
+<a name="1540" href="#1540">1540</a>       <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
+<a name="1541" href="#1541">1541</a>         LOG.info(<span class="jxr_string">"processEvent: Job state changed, checking "</span> +
+<a name="1542" href="#1542">1542</a>             <span class="jxr_string">"to see if it needs to restart"</span>);
+<a name="1543" href="#1543">1543</a>       }
+<a name="1544" href="#1544">1544</a>       JSONObject jsonObj = getJobState();
+<a name="1545" href="#1545">1545</a>       <strong class="jxr_keyword">try</strong> {
+<a name="1546" href="#1546">1546</a>         <strong class="jxr_keyword">if</strong> ((ApplicationState.valueOf(jsonObj.getString(JSONOBJ_STATE_KEY)) ==
+<a name="1547" href="#1547">1547</a>             ApplicationState.START_SUPERSTEP) &amp;&amp;
+<a name="1548" href="#1548">1548</a>             jsonObj.getLong(JSONOBJ_APPLICATION_ATTEMPT_KEY) !=
+<a name="1549" href="#1549">1549</a>             getApplicationAttempt()) {
+<a name="1550" href="#1550">1550</a>           LOG.fatal(<span class="jxr_string">"processEvent: Worker will restart "</span> +
+<a name="1551" href="#1551">1551</a>               <span class="jxr_string">"from command - "</span> + jsonObj.toString());
+<a name="1552" href="#1552">1552</a>           System.exit(-1);
+<a name="1553" href="#1553">1553</a>         }
+<a name="1554" href="#1554">1554</a>       } <strong class="jxr_keyword">catch</strong> (JSONException e) {
+<a name="1555" href="#1555">1555</a>         <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> RuntimeException(
+<a name="1556" href="#1556">1556</a>             <span class="jxr_string">"processEvent: Couldn't properly get job state from "</span> +
+<a name="1557" href="#1557">1557</a>                 jsonObj.toString());
+<a name="1558" href="#1558">1558</a>       }
+<a name="1559" href="#1559">1559</a>       foundEvent = <strong class="jxr_keyword">true</strong>;
+<a name="1560" href="#1560">1560</a>     } <strong class="jxr_keyword">else</strong> <strong class="jxr_keyword">if</strong> (event.getPath().contains(PARTITION_EXCHANGE_DIR) &amp;&amp;
+<a name="1561" href="#1561">1561</a>         event.getType() == EventType.NodeChildrenChanged) {
+<a name="1562" href="#1562">1562</a>       <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
+<a name="1563" href="#1563">1563</a>         LOG.info(<span class="jxr_string">"processEvent : partitionExchangeChildrenChanged "</span> +
+<a name="1564" href="#1564">1564</a>             <span class="jxr_string">"(at least one worker is done sending partitions)"</span>);
+<a name="1565" href="#1565">1565</a>       }
+<a name="1566" href="#1566">1566</a>       partitionExchangeChildrenChanged.signal();
+<a name="1567" href="#1567">1567</a>       foundEvent = <strong class="jxr_keyword">true</strong>;
+<a name="1568" href="#1568">1568</a>     }
+<a name="1569" href="#1569">1569</a> 
+<a name="1570" href="#1570">1570</a>     <strong class="jxr_keyword">return</strong> foundEvent;
+<a name="1571" href="#1571">1571</a>   }
+<a name="1572" href="#1572">1572</a> 
+<a name="1573" href="#1573">1573</a>   @Override
+<a name="1574" href="#1574">1574</a>   <strong class="jxr_keyword">public</strong> <a href="../../../../org/apache/giraph/graph/WorkerInfo.html">WorkerInfo</a> getWorkerInfo() {
+<a name="1575" href="#1575">1575</a>     <strong class="jxr_keyword">return</strong> workerInfo;
+<a name="1576" href="#1576">1576</a>   }
+<a name="1577" href="#1577">1577</a> 
+<a name="1578" href="#1578">1578</a>   @Override
+<a name="1579" href="#1579">1579</a>   <strong class="jxr_keyword">public</strong> Map&lt;Integer, Partition&lt;I, V, E, M&gt;&gt; getPartitionMap() {
+<a name="1580" href="#1580">1580</a>     <strong class="jxr_keyword">return</strong> workerPartitionMap;
+<a name="1581" href="#1581">1581</a>   }
+<a name="1582" href="#1582">1582</a> 
+<a name="1583" href="#1583">1583</a>   @Override
+<a name="1584" href="#1584">1584</a>   <strong class="jxr_keyword">public</strong> Collection&lt;? <strong class="jxr_keyword">extends</strong> PartitionOwner&gt; getPartitionOwners() {
+<a name="1585" href="#1585">1585</a>     <strong class="jxr_keyword">return</strong> workerGraphPartitioner.getPartitionOwners();
+<a name="1586" href="#1586">1586</a>   }
+<a name="1587" href="#1587">1587</a> 
+<a name="1588" href="#1588">1588</a>   @Override
+<a name="1589" href="#1589">1589</a>   <strong class="jxr_keyword">public</strong> <a href="../../../../org/apache/giraph/graph/partition/PartitionOwner.html">PartitionOwner</a> getVertexPartitionOwner(I vertexId) {
+<a name="1590" href="#1590">1590</a>     <strong class="jxr_keyword">return</strong> workerGraphPartitioner.getPartitionOwner(vertexId);
+<a name="1591" href="#1591">1591</a>   }
+<a name="1592" href="#1592">1592</a> 
+<a name="1593" href="#1593">1593</a>   <em class="jxr_javadoccomment">/**</em>
+<a name="1594" href="#1594">1594</a> <em class="jxr_javadoccomment">   * Get the partition for a vertex index.</em>
+<a name="1595" href="#1595">1595</a> <em class="jxr_javadoccomment">   *</em>
+<a name="1596" href="#1596">1596</a> <em class="jxr_javadoccomment">   * @param vertexId Vertex index to search for the partition.</em>
+<a name="1597" href="#1597">1597</a> <em class="jxr_javadoccomment">   * @return Partition that owns this vertex.</em>
+<a name="1598" href="#1598">1598</a> <em class="jxr_javadoccomment">   */</em>
+<a name="1599" href="#1599">1599</a>   <strong class="jxr_keyword">public</strong> Partition&lt;I, V, E, M&gt; getPartition(I vertexId) {
+<a name="1600" href="#1600">1600</a>     <a href="../../../../org/apache/giraph/graph/partition/PartitionOwner.html">PartitionOwner</a> partitionOwner = getVertexPartitionOwner(vertexId);
+<a name="1601" href="#1601">1601</a>     <strong class="jxr_keyword">return</strong> workerPartitionMap.get(partitionOwner.getPartitionId());
+<a name="1602" href="#1602">1602</a>   }
+<a name="1603" href="#1603">1603</a> 
+<a name="1604" href="#1604">1604</a>   @Override
+<a name="1605" href="#1605">1605</a>   <strong class="jxr_keyword">public</strong> Vertex&lt;I, V, E, M&gt; getVertex(I vertexId) {
+<a name="1606" href="#1606">1606</a>     <a href="../../../../org/apache/giraph/graph/partition/PartitionOwner.html">PartitionOwner</a> partitionOwner = getVertexPartitionOwner(vertexId);
+<a name="1607" href="#1607">1607</a>     <strong class="jxr_keyword">if</strong> (workerPartitionMap.containsKey(partitionOwner.getPartitionId())) {
+<a name="1608" href="#1608">1608</a>       <strong class="jxr_keyword">return</strong> workerPartitionMap.get(
+<a name="1609" href="#1609">1609</a>           partitionOwner.getPartitionId()).getVertex(vertexId);
+<a name="1610" href="#1610">1610</a>     } <strong class="jxr_keyword">else</strong> {
+<a name="1611" href="#1611">1611</a>       <strong class="jxr_keyword">return</strong> <strong class="jxr_keyword">null</strong>;
+<a name="1612" href="#1612">1612</a>     }
+<a name="1613" href="#1613">1613</a>   }
+<a name="1614" href="#1614">1614</a> 
+<a name="1615" href="#1615">1615</a>   @Override
+<a name="1616" href="#1616">1616</a>   <strong class="jxr_keyword">public</strong> ServerData&lt;I, V, E, M&gt; getServerData() {
+<a name="1617" href="#1617">1617</a>     <strong class="jxr_keyword">return</strong> commService.getServerData();
+<a name="1618" href="#1618">1618</a>   }
+<a name="1619" href="#1619">1619</a> }
 </pre>
 <hr/><div id="footer">This page was automatically generated by <a href="http://maven.apache.org/">Maven</a></div></body>
 </html>