You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ek...@apache.org on 2012/08/16 03:32:58 UTC
svn commit: r1373684 [33/35] - in /giraph/site: ./ apidocs/
apidocs/org/apache/giraph/ apidocs/org/apache/giraph/benchmark/
apidocs/org/apache/giraph/benchmark/class-use/
apidocs/org/apache/giraph/bsp/ apidocs/org/apache/giraph/bsp/class-use/
apidocs/o...
Modified: giraph/site/xref/org/apache/giraph/graph/BspServiceWorker.html
URL: http://svn.apache.org/viewvc/giraph/site/xref/org/apache/giraph/graph/BspServiceWorker.html?rev=1373684&r1=1373683&r2=1373684&view=diff
==============================================================================
--- giraph/site/xref/org/apache/giraph/graph/BspServiceWorker.html (original)
+++ giraph/site/xref/org/apache/giraph/graph/BspServiceWorker.html Thu Aug 16 01:32:41 2012
@@ -1222,373 +1222,411 @@
<a name="1212" href="#1212">1212</a> }
<a name="1213" href="#1213">1213</a>
<a name="1214" href="#1214">1214</a> getFs().createNewFile(validFilePath);
-<a name="1215" href="#1215">1215</a> }
-<a name="1216" href="#1216">1216</a>
-<a name="1217" href="#1217">1217</a> @Override
-<a name="1218" href="#1218">1218</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">void</strong> loadCheckpoint(<strong class="jxr_keyword">long</strong> superstep) {
-<a name="1219" href="#1219">1219</a> <strong class="jxr_keyword">if</strong> (getConfiguration().getBoolean(GiraphJob.USE_NETTY,
-<a name="1220" href="#1220">1220</a> GiraphJob.USE_NETTY_DEFAULT)) {
-<a name="1221" href="#1221">1221</a> <strong class="jxr_keyword">try</strong> {
-<a name="1222" href="#1222">1222</a> <em class="jxr_comment">// clear old message stores</em>
-<a name="1223" href="#1223">1223</a> getServerData().getIncomingMessageStore().clearAll();
-<a name="1224" href="#1224">1224</a> getServerData().getCurrentMessageStore().clearAll();
-<a name="1225" href="#1225">1225</a> } <strong class="jxr_keyword">catch</strong> (IOException e) {
-<a name="1226" href="#1226">1226</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> RuntimeException(
-<a name="1227" href="#1227">1227</a> <span class="jxr_string">"loadCheckpoint: Failed to clear message stores "</span>, e);
-<a name="1228" href="#1228">1228</a> }
-<a name="1229" href="#1229">1229</a> }
-<a name="1230" href="#1230">1230</a>
-<a name="1231" href="#1231">1231</a> <em class="jxr_comment">// Algorithm:</em>
-<a name="1232" href="#1232">1232</a> <em class="jxr_comment">// Examine all the partition owners and load the ones</em>
-<a name="1233" href="#1233">1233</a> <em class="jxr_comment">// that match my hostname and id from the master designated checkpoint</em>
-<a name="1234" href="#1234">1234</a> <em class="jxr_comment">// prefixes.</em>
-<a name="1235" href="#1235">1235</a> <strong class="jxr_keyword">long</strong> startPos = 0;
-<a name="1236" href="#1236">1236</a> <strong class="jxr_keyword">int</strong> loadedPartitions = 0;
-<a name="1237" href="#1237">1237</a> <strong class="jxr_keyword">for</strong> (PartitionOwner partitionOwner :
-<a name="1238" href="#1238">1238</a> workerGraphPartitioner.getPartitionOwners()) {
-<a name="1239" href="#1239">1239</a> <strong class="jxr_keyword">if</strong> (partitionOwner.getWorkerInfo().equals(getWorkerInfo())) {
-<a name="1240" href="#1240">1240</a> String metadataFile =
-<a name="1241" href="#1241">1241</a> partitionOwner.getCheckpointFilesPrefix() +
-<a name="1242" href="#1242">1242</a> CHECKPOINT_METADATA_POSTFIX;
-<a name="1243" href="#1243">1243</a> String partitionsFile =
-<a name="1244" href="#1244">1244</a> partitionOwner.getCheckpointFilesPrefix() +
-<a name="1245" href="#1245">1245</a> CHECKPOINT_VERTICES_POSTFIX;
-<a name="1246" href="#1246">1246</a> <strong class="jxr_keyword">try</strong> {
-<a name="1247" href="#1247">1247</a> <strong class="jxr_keyword">int</strong> partitionId = -1;
-<a name="1248" href="#1248">1248</a> DataInputStream metadataStream =
-<a name="1249" href="#1249">1249</a> getFs().open(<strong class="jxr_keyword">new</strong> Path(metadataFile));
-<a name="1250" href="#1250">1250</a> <strong class="jxr_keyword">int</strong> partitions = metadataStream.readInt();
-<a name="1251" href="#1251">1251</a> <strong class="jxr_keyword">for</strong> (<strong class="jxr_keyword">int</strong> i = 0; i < partitions; ++i) {
-<a name="1252" href="#1252">1252</a> startPos = metadataStream.readLong();
-<a name="1253" href="#1253">1253</a> partitionId = metadataStream.readInt();
-<a name="1254" href="#1254">1254</a> <strong class="jxr_keyword">if</strong> (partitionId == partitionOwner.getPartitionId()) {
-<a name="1255" href="#1255">1255</a> <strong class="jxr_keyword">break</strong>;
-<a name="1256" href="#1256">1256</a> }
-<a name="1257" href="#1257">1257</a> }
-<a name="1258" href="#1258">1258</a> <strong class="jxr_keyword">if</strong> (partitionId != partitionOwner.getPartitionId()) {
-<a name="1259" href="#1259">1259</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
-<a name="1260" href="#1260">1260</a> <span class="jxr_string">"loadCheckpoint: "</span> + partitionOwner +
-<a name="1261" href="#1261">1261</a> <span class="jxr_string">" not found!"</span>);
-<a name="1262" href="#1262">1262</a> }
-<a name="1263" href="#1263">1263</a> metadataStream.close();
-<a name="1264" href="#1264">1264</a> Partition<I, V, E, M> partition =
-<a name="1265" href="#1265">1265</a> <strong class="jxr_keyword">new</strong> Partition<I, V, E, M>(
-<a name="1266" href="#1266">1266</a> getConfiguration(),
-<a name="1267" href="#1267">1267</a> partitionId);
-<a name="1268" href="#1268">1268</a> DataInputStream partitionsStream =
-<a name="1269" href="#1269">1269</a> getFs().open(<strong class="jxr_keyword">new</strong> Path(partitionsFile));
-<a name="1270" href="#1270">1270</a> <strong class="jxr_keyword">if</strong> (partitionsStream.skip(startPos) != startPos) {
-<a name="1271" href="#1271">1271</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
-<a name="1272" href="#1272">1272</a> <span class="jxr_string">"loadCheckpoint: Failed to skip "</span> + startPos +
-<a name="1273" href="#1273">1273</a> <span class="jxr_string">" on "</span> + partitionsFile);
-<a name="1274" href="#1274">1274</a> }
-<a name="1275" href="#1275">1275</a> partition.readFields(partitionsStream);
-<a name="1276" href="#1276">1276</a> <strong class="jxr_keyword">if</strong> (partitionsStream.readBoolean()) {
-<a name="1277" href="#1277">1277</a> getServerData().getCurrentMessageStore().readFieldsForPartition(
-<a name="1278" href="#1278">1278</a> partitionsStream, partitionId);
-<a name="1279" href="#1279">1279</a> }
-<a name="1280" href="#1280">1280</a> partitionsStream.close();
-<a name="1281" href="#1281">1281</a> <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
-<a name="1282" href="#1282">1282</a> LOG.info(<span class="jxr_string">"loadCheckpoint: Loaded partition "</span> +
-<a name="1283" href="#1283">1283</a> partition);
-<a name="1284" href="#1284">1284</a> }
-<a name="1285" href="#1285">1285</a> <strong class="jxr_keyword">if</strong> (getPartitionMap().put(partitionId, partition) != <strong class="jxr_keyword">null</strong>) {
-<a name="1286" href="#1286">1286</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
-<a name="1287" href="#1287">1287</a> <span class="jxr_string">"loadCheckpoint: Already has partition owner "</span> +
-<a name="1288" href="#1288">1288</a> partitionOwner);
-<a name="1289" href="#1289">1289</a> }
-<a name="1290" href="#1290">1290</a> ++loadedPartitions;
-<a name="1291" href="#1291">1291</a> } <strong class="jxr_keyword">catch</strong> (IOException e) {
-<a name="1292" href="#1292">1292</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> RuntimeException(
-<a name="1293" href="#1293">1293</a> <span class="jxr_string">"loadCheckpoing: Failed to get partition owner "</span> +
-<a name="1294" href="#1294">1294</a> partitionOwner, e);
-<a name="1295" href="#1295">1295</a> }
-<a name="1296" href="#1296">1296</a> }
-<a name="1297" href="#1297">1297</a> }
-<a name="1298" href="#1298">1298</a> <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
-<a name="1299" href="#1299">1299</a> LOG.info(<span class="jxr_string">"loadCheckpoint: Loaded "</span> + loadedPartitions +
-<a name="1300" href="#1300">1300</a> <span class="jxr_string">" partitions of out "</span> +
-<a name="1301" href="#1301">1301</a> workerGraphPartitioner.getPartitionOwners().size() +
-<a name="1302" href="#1302">1302</a> <span class="jxr_string">" total."</span>);
-<a name="1303" href="#1303">1303</a> }
-<a name="1304" href="#1304">1304</a> <em class="jxr_comment">// Communication service needs to setup the connections prior to</em>
-<a name="1305" href="#1305">1305</a> <em class="jxr_comment">// processing vertices</em>
-<a name="1306" href="#1306">1306</a> commService.setup();
-<a name="1307" href="#1307">1307</a> }
-<a name="1308" href="#1308">1308</a>
-<a name="1309" href="#1309">1309</a> <em class="jxr_javadoccomment">/**</em>
-<a name="1310" href="#1310">1310</a> <em class="jxr_javadoccomment"> * Send the worker partitions to their destination workers</em>
-<a name="1311" href="#1311">1311</a> <em class="jxr_javadoccomment"> *</em>
-<a name="1312" href="#1312">1312</a> <em class="jxr_javadoccomment"> * @param workerPartitionMap Map of worker info to the partitions stored</em>
-<a name="1313" href="#1313">1313</a> <em class="jxr_javadoccomment"> * on this worker to be sent</em>
-<a name="1314" href="#1314">1314</a> <em class="jxr_javadoccomment"> */</em>
-<a name="1315" href="#1315">1315</a> <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">void</strong> sendWorkerPartitions(
-<a name="1316" href="#1316">1316</a> Map<WorkerInfo, List<Integer>> workerPartitionMap) {
-<a name="1317" href="#1317">1317</a> List<Entry<WorkerInfo, List<Integer>>> randomEntryList =
-<a name="1318" href="#1318">1318</a> <strong class="jxr_keyword">new</strong> ArrayList<Entry<WorkerInfo, List<Integer>>>(
-<a name="1319" href="#1319">1319</a> workerPartitionMap.entrySet());
-<a name="1320" href="#1320">1320</a> Collections.shuffle(randomEntryList);
-<a name="1321" href="#1321">1321</a> <strong class="jxr_keyword">for</strong> (Entry<WorkerInfo, List<Integer>> workerPartitionList :
-<a name="1322" href="#1322">1322</a> randomEntryList) {
-<a name="1323" href="#1323">1323</a> <strong class="jxr_keyword">for</strong> (Integer partitionId : workerPartitionList.getValue()) {
-<a name="1324" href="#1324">1324</a> Partition<I, V, E, M> partition =
-<a name="1325" href="#1325">1325</a> getPartitionMap().get(partitionId);
-<a name="1326" href="#1326">1326</a> <strong class="jxr_keyword">if</strong> (partition == <strong class="jxr_keyword">null</strong>) {
-<a name="1327" href="#1327">1327</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
-<a name="1328" href="#1328">1328</a> <span class="jxr_string">"sendWorkerPartitions: Couldn't find partition "</span> +
-<a name="1329" href="#1329">1329</a> partitionId + <span class="jxr_string">" to send to "</span> +
-<a name="1330" href="#1330">1330</a> workerPartitionList.getKey());
-<a name="1331" href="#1331">1331</a> }
-<a name="1332" href="#1332">1332</a> <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
-<a name="1333" href="#1333">1333</a> LOG.info(<span class="jxr_string">"sendWorkerPartitions: Sending worker "</span> +
-<a name="1334" href="#1334">1334</a> workerPartitionList.getKey() + <span class="jxr_string">" partition "</span> +
-<a name="1335" href="#1335">1335</a> partitionId);
-<a name="1336" href="#1336">1336</a> }
-<a name="1337" href="#1337">1337</a> getGraphMapper().getGraphState().getWorkerCommunications().
-<a name="1338" href="#1338">1338</a> sendPartitionRequest(workerPartitionList.getKey(),
-<a name="1339" href="#1339">1339</a> partition);
-<a name="1340" href="#1340">1340</a> getPartitionMap().remove(partitionId);
-<a name="1341" href="#1341">1341</a> }
-<a name="1342" href="#1342">1342</a> }
-<a name="1343" href="#1343">1343</a>
-<a name="1344" href="#1344">1344</a> <strong class="jxr_keyword">try</strong> {
-<a name="1345" href="#1345">1345</a> getGraphMapper().getGraphState().getWorkerCommunications().flush();
-<a name="1346" href="#1346">1346</a> } <strong class="jxr_keyword">catch</strong> (IOException e) {
-<a name="1347" href="#1347">1347</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(<span class="jxr_string">"sendWorkerPartitions: Flush failed"</span>, e);
-<a name="1348" href="#1348">1348</a> }
-<a name="1349" href="#1349">1349</a> String myPartitionExchangeDonePath =
-<a name="1350" href="#1350">1350</a> getPartitionExchangeWorkerPath(
-<a name="1351" href="#1351">1351</a> getApplicationAttempt(), getSuperstep(), getWorkerInfo());
-<a name="1352" href="#1352">1352</a> <strong class="jxr_keyword">try</strong> {
-<a name="1353" href="#1353">1353</a> getZkExt().createExt(myPartitionExchangeDonePath,
-<a name="1354" href="#1354">1354</a> <strong class="jxr_keyword">null</strong>,
-<a name="1355" href="#1355">1355</a> Ids.OPEN_ACL_UNSAFE,
-<a name="1356" href="#1356">1356</a> CreateMode.PERSISTENT,
-<a name="1357" href="#1357">1357</a> <strong class="jxr_keyword">true</strong>);
-<a name="1358" href="#1358">1358</a> } <strong class="jxr_keyword">catch</strong> (KeeperException e) {
-<a name="1359" href="#1359">1359</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
-<a name="1360" href="#1360">1360</a> <span class="jxr_string">"sendWorkerPartitions: KeeperException to create "</span> +
-<a name="1361" href="#1361">1361</a> myPartitionExchangeDonePath, e);
-<a name="1362" href="#1362">1362</a> } <strong class="jxr_keyword">catch</strong> (InterruptedException e) {
-<a name="1363" href="#1363">1363</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
-<a name="1364" href="#1364">1364</a> <span class="jxr_string">"sendWorkerPartitions: InterruptedException to create "</span> +
-<a name="1365" href="#1365">1365</a> myPartitionExchangeDonePath, e);
-<a name="1366" href="#1366">1366</a> }
-<a name="1367" href="#1367">1367</a> <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
-<a name="1368" href="#1368">1368</a> LOG.info(<span class="jxr_string">"sendWorkerPartitions: Done sending all my partitions."</span>);
-<a name="1369" href="#1369">1369</a> }
-<a name="1370" href="#1370">1370</a> }
-<a name="1371" href="#1371">1371</a>
-<a name="1372" href="#1372">1372</a> @Override
-<a name="1373" href="#1373">1373</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">final</strong> <strong class="jxr_keyword">void</strong> exchangeVertexPartitions(
-<a name="1374" href="#1374">1374</a> Collection<? <strong class="jxr_keyword">extends</strong> PartitionOwner> masterSetPartitionOwners) {
-<a name="1375" href="#1375">1375</a> <em class="jxr_comment">// 1. Fix the addresses of the partition ids if they have changed.</em>
-<a name="1376" href="#1376">1376</a> <em class="jxr_comment">// 2. Send all the partitions to their destination workers in a random</em>
-<a name="1377" href="#1377">1377</a> <em class="jxr_comment">// fashion.</em>
-<a name="1378" href="#1378">1378</a> <em class="jxr_comment">// 3. Notify completion with a ZooKeeper stamp</em>
-<a name="1379" href="#1379">1379</a> <em class="jxr_comment">// 4. Wait for all my dependencies to be done (if any)</em>
-<a name="1380" href="#1380">1380</a> <em class="jxr_comment">// 5. Add the partitions to myself.</em>
-<a name="1381" href="#1381">1381</a> <a href="../../../../org/apache/giraph/graph/partition/PartitionExchange.html">PartitionExchange</a> partitionExchange =
-<a name="1382" href="#1382">1382</a> workerGraphPartitioner.updatePartitionOwners(
-<a name="1383" href="#1383">1383</a> getWorkerInfo(), masterSetPartitionOwners, getPartitionMap());
-<a name="1384" href="#1384">1384</a> commService.fixPartitionIdToSocketAddrMap();
-<a name="1385" href="#1385">1385</a>
-<a name="1386" href="#1386">1386</a> Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap =
-<a name="1387" href="#1387">1387</a> partitionExchange.getSendWorkerPartitionMap();
-<a name="1388" href="#1388">1388</a> <strong class="jxr_keyword">if</strong> (!workerPartitionMap.isEmpty()) {
-<a name="1389" href="#1389">1389</a> sendWorkerPartitions(sendWorkerPartitionMap);
-<a name="1390" href="#1390">1390</a> }
-<a name="1391" href="#1391">1391</a>
-<a name="1392" href="#1392">1392</a> Set<WorkerInfo> myDependencyWorkerSet =
-<a name="1393" href="#1393">1393</a> partitionExchange.getMyDependencyWorkerSet();
-<a name="1394" href="#1394">1394</a> Set<String> workerIdSet = <strong class="jxr_keyword">new</strong> HashSet<String>();
-<a name="1395" href="#1395">1395</a> <strong class="jxr_keyword">for</strong> (WorkerInfo tmpWorkerInfo : myDependencyWorkerSet) {
-<a name="1396" href="#1396">1396</a> <strong class="jxr_keyword">if</strong> (!workerIdSet.add(tmpWorkerInfo.getHostnameId())) {
-<a name="1397" href="#1397">1397</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
-<a name="1398" href="#1398">1398</a> <span class="jxr_string">"exchangeVertexPartitions: Duplicate entry "</span> + tmpWorkerInfo);
-<a name="1399" href="#1399">1399</a> }
-<a name="1400" href="#1400">1400</a> }
-<a name="1401" href="#1401">1401</a> <strong class="jxr_keyword">if</strong> (myDependencyWorkerSet.isEmpty() && workerPartitionMap.isEmpty()) {
-<a name="1402" href="#1402">1402</a> <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
-<a name="1403" href="#1403">1403</a> LOG.info(<span class="jxr_string">"exchangeVertexPartitions: Nothing to exchange, "</span> +
-<a name="1404" href="#1404">1404</a> <span class="jxr_string">"exiting early"</span>);
-<a name="1405" href="#1405">1405</a> }
-<a name="1406" href="#1406">1406</a> <strong class="jxr_keyword">return</strong>;
+<a name="1215" href="#1215">1215</a>
+<a name="1216" href="#1216">1216</a> <em class="jxr_comment">// Notify master that checkpoint is stored</em>
+<a name="1217" href="#1217">1217</a> String workerWroteCheckpoint =
+<a name="1218" href="#1218">1218</a> getWorkerWroteCheckpointPath(getApplicationAttempt(),
+<a name="1219" href="#1219">1219</a> getSuperstep()) + <span class="jxr_string">"/"</span> + getHostnamePartitionId();
+<a name="1220" href="#1220">1220</a> <strong class="jxr_keyword">try</strong> {
+<a name="1221" href="#1221">1221</a> getZkExt().createExt(workerWroteCheckpoint,
+<a name="1222" href="#1222">1222</a> <strong class="jxr_keyword">new</strong> byte[0],
+<a name="1223" href="#1223">1223</a> Ids.OPEN_ACL_UNSAFE,
+<a name="1224" href="#1224">1224</a> CreateMode.PERSISTENT,
+<a name="1225" href="#1225">1225</a> <strong class="jxr_keyword">true</strong>);
+<a name="1226" href="#1226">1226</a> } <strong class="jxr_keyword">catch</strong> (KeeperException.NodeExistsException e) {
+<a name="1227" href="#1227">1227</a> LOG.warn(<span class="jxr_string">"finishSuperstep: wrote checkpoint worker path "</span> +
+<a name="1228" href="#1228">1228</a> workerWroteCheckpoint + <span class="jxr_string">" already exists!"</span>);
+<a name="1229" href="#1229">1229</a> } <strong class="jxr_keyword">catch</strong> (KeeperException e) {
+<a name="1230" href="#1230">1230</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(<span class="jxr_string">"Creating "</span> + workerWroteCheckpoint +
+<a name="1231" href="#1231">1231</a> <span class="jxr_string">" failed with KeeperException"</span>, e);
+<a name="1232" href="#1232">1232</a> } <strong class="jxr_keyword">catch</strong> (InterruptedException e) {
+<a name="1233" href="#1233">1233</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(<span class="jxr_string">"Creating "</span> + workerWroteCheckpoint +
+<a name="1234" href="#1234">1234</a> <span class="jxr_string">" failed with InterruptedException"</span>, e);
+<a name="1235" href="#1235">1235</a> }
+<a name="1236" href="#1236">1236</a> }
+<a name="1237" href="#1237">1237</a>
+<a name="1238" href="#1238">1238</a> @Override
+<a name="1239" href="#1239">1239</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">void</strong> loadCheckpoint(<strong class="jxr_keyword">long</strong> superstep) {
+<a name="1240" href="#1240">1240</a> <strong class="jxr_keyword">if</strong> (getConfiguration().getBoolean(GiraphJob.USE_NETTY,
+<a name="1241" href="#1241">1241</a> GiraphJob.USE_NETTY_DEFAULT)) {
+<a name="1242" href="#1242">1242</a> <strong class="jxr_keyword">try</strong> {
+<a name="1243" href="#1243">1243</a> <em class="jxr_comment">// clear old message stores</em>
+<a name="1244" href="#1244">1244</a> getServerData().getIncomingMessageStore().clearAll();
+<a name="1245" href="#1245">1245</a> getServerData().getCurrentMessageStore().clearAll();
+<a name="1246" href="#1246">1246</a> } <strong class="jxr_keyword">catch</strong> (IOException e) {
+<a name="1247" href="#1247">1247</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> RuntimeException(
+<a name="1248" href="#1248">1248</a> <span class="jxr_string">"loadCheckpoint: Failed to clear message stores "</span>, e);
+<a name="1249" href="#1249">1249</a> }
+<a name="1250" href="#1250">1250</a> }
+<a name="1251" href="#1251">1251</a>
+<a name="1252" href="#1252">1252</a> <em class="jxr_comment">// Algorithm:</em>
+<a name="1253" href="#1253">1253</a> <em class="jxr_comment">// Examine all the partition owners and load the ones</em>
+<a name="1254" href="#1254">1254</a> <em class="jxr_comment">// that match my hostname and id from the master designated checkpoint</em>
+<a name="1255" href="#1255">1255</a> <em class="jxr_comment">// prefixes.</em>
+<a name="1256" href="#1256">1256</a> <strong class="jxr_keyword">long</strong> startPos = 0;
+<a name="1257" href="#1257">1257</a> <strong class="jxr_keyword">int</strong> loadedPartitions = 0;
+<a name="1258" href="#1258">1258</a> <strong class="jxr_keyword">for</strong> (PartitionOwner partitionOwner :
+<a name="1259" href="#1259">1259</a> workerGraphPartitioner.getPartitionOwners()) {
+<a name="1260" href="#1260">1260</a> <strong class="jxr_keyword">if</strong> (partitionOwner.getWorkerInfo().equals(getWorkerInfo())) {
+<a name="1261" href="#1261">1261</a> String metadataFile =
+<a name="1262" href="#1262">1262</a> partitionOwner.getCheckpointFilesPrefix() +
+<a name="1263" href="#1263">1263</a> CHECKPOINT_METADATA_POSTFIX;
+<a name="1264" href="#1264">1264</a> String partitionsFile =
+<a name="1265" href="#1265">1265</a> partitionOwner.getCheckpointFilesPrefix() +
+<a name="1266" href="#1266">1266</a> CHECKPOINT_VERTICES_POSTFIX;
+<a name="1267" href="#1267">1267</a> <strong class="jxr_keyword">try</strong> {
+<a name="1268" href="#1268">1268</a> <strong class="jxr_keyword">int</strong> partitionId = -1;
+<a name="1269" href="#1269">1269</a> DataInputStream metadataStream =
+<a name="1270" href="#1270">1270</a> getFs().open(<strong class="jxr_keyword">new</strong> Path(metadataFile));
+<a name="1271" href="#1271">1271</a> <strong class="jxr_keyword">int</strong> partitions = metadataStream.readInt();
+<a name="1272" href="#1272">1272</a> <strong class="jxr_keyword">for</strong> (<strong class="jxr_keyword">int</strong> i = 0; i < partitions; ++i) {
+<a name="1273" href="#1273">1273</a> startPos = metadataStream.readLong();
+<a name="1274" href="#1274">1274</a> partitionId = metadataStream.readInt();
+<a name="1275" href="#1275">1275</a> <strong class="jxr_keyword">if</strong> (partitionId == partitionOwner.getPartitionId()) {
+<a name="1276" href="#1276">1276</a> <strong class="jxr_keyword">break</strong>;
+<a name="1277" href="#1277">1277</a> }
+<a name="1278" href="#1278">1278</a> }
+<a name="1279" href="#1279">1279</a> <strong class="jxr_keyword">if</strong> (partitionId != partitionOwner.getPartitionId()) {
+<a name="1280" href="#1280">1280</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
+<a name="1281" href="#1281">1281</a> <span class="jxr_string">"loadCheckpoint: "</span> + partitionOwner +
+<a name="1282" href="#1282">1282</a> <span class="jxr_string">" not found!"</span>);
+<a name="1283" href="#1283">1283</a> }
+<a name="1284" href="#1284">1284</a> metadataStream.close();
+<a name="1285" href="#1285">1285</a> Partition<I, V, E, M> partition =
+<a name="1286" href="#1286">1286</a> <strong class="jxr_keyword">new</strong> Partition<I, V, E, M>(
+<a name="1287" href="#1287">1287</a> getConfiguration(),
+<a name="1288" href="#1288">1288</a> partitionId);
+<a name="1289" href="#1289">1289</a> DataInputStream partitionsStream =
+<a name="1290" href="#1290">1290</a> getFs().open(<strong class="jxr_keyword">new</strong> Path(partitionsFile));
+<a name="1291" href="#1291">1291</a> <strong class="jxr_keyword">if</strong> (partitionsStream.skip(startPos) != startPos) {
+<a name="1292" href="#1292">1292</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
+<a name="1293" href="#1293">1293</a> <span class="jxr_string">"loadCheckpoint: Failed to skip "</span> + startPos +
+<a name="1294" href="#1294">1294</a> <span class="jxr_string">" on "</span> + partitionsFile);
+<a name="1295" href="#1295">1295</a> }
+<a name="1296" href="#1296">1296</a> partition.readFields(partitionsStream);
+<a name="1297" href="#1297">1297</a> <strong class="jxr_keyword">if</strong> (partitionsStream.readBoolean()) {
+<a name="1298" href="#1298">1298</a> getServerData().getCurrentMessageStore().readFieldsForPartition(
+<a name="1299" href="#1299">1299</a> partitionsStream, partitionId);
+<a name="1300" href="#1300">1300</a> }
+<a name="1301" href="#1301">1301</a> partitionsStream.close();
+<a name="1302" href="#1302">1302</a> <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
+<a name="1303" href="#1303">1303</a> LOG.info(<span class="jxr_string">"loadCheckpoint: Loaded partition "</span> +
+<a name="1304" href="#1304">1304</a> partition);
+<a name="1305" href="#1305">1305</a> }
+<a name="1306" href="#1306">1306</a> <strong class="jxr_keyword">if</strong> (getPartitionMap().put(partitionId, partition) != <strong class="jxr_keyword">null</strong>) {
+<a name="1307" href="#1307">1307</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
+<a name="1308" href="#1308">1308</a> <span class="jxr_string">"loadCheckpoint: Already has partition owner "</span> +
+<a name="1309" href="#1309">1309</a> partitionOwner);
+<a name="1310" href="#1310">1310</a> }
+<a name="1311" href="#1311">1311</a> ++loadedPartitions;
+<a name="1312" href="#1312">1312</a> } <strong class="jxr_keyword">catch</strong> (IOException e) {
+<a name="1313" href="#1313">1313</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> RuntimeException(
+<a name="1314" href="#1314">1314</a> <span class="jxr_string">"loadCheckpoing: Failed to get partition owner "</span> +
+<a name="1315" href="#1315">1315</a> partitionOwner, e);
+<a name="1316" href="#1316">1316</a> }
+<a name="1317" href="#1317">1317</a> }
+<a name="1318" href="#1318">1318</a> }
+<a name="1319" href="#1319">1319</a> <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
+<a name="1320" href="#1320">1320</a> LOG.info(<span class="jxr_string">"loadCheckpoint: Loaded "</span> + loadedPartitions +
+<a name="1321" href="#1321">1321</a> <span class="jxr_string">" partitions of out "</span> +
+<a name="1322" href="#1322">1322</a> workerGraphPartitioner.getPartitionOwners().size() +
+<a name="1323" href="#1323">1323</a> <span class="jxr_string">" total."</span>);
+<a name="1324" href="#1324">1324</a> }
+<a name="1325" href="#1325">1325</a>
+<a name="1326" href="#1326">1326</a> <em class="jxr_comment">// Load global statistics</em>
+<a name="1327" href="#1327">1327</a> String finalizedCheckpointPath =
+<a name="1328" href="#1328">1328</a> getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
+<a name="1329" href="#1329">1329</a> <strong class="jxr_keyword">try</strong> {
+<a name="1330" href="#1330">1330</a> DataInputStream finalizedStream =
+<a name="1331" href="#1331">1331</a> getFs().open(<strong class="jxr_keyword">new</strong> Path(finalizedCheckpointPath));
+<a name="1332" href="#1332">1332</a> <a href="../../../../org/apache/giraph/graph/GlobalStats.html">GlobalStats</a> globalStats = <strong class="jxr_keyword">new</strong> <a href="../../../../org/apache/giraph/graph/GlobalStats.html">GlobalStats</a>();
+<a name="1333" href="#1333">1333</a> globalStats.readFields(finalizedStream);
+<a name="1334" href="#1334">1334</a> getGraphMapper().getGraphState().
+<a name="1335" href="#1335">1335</a> setTotalNumEdges(globalStats.getEdgeCount()).
+<a name="1336" href="#1336">1336</a> setTotalNumVertices(globalStats.getVertexCount());
+<a name="1337" href="#1337">1337</a> } <strong class="jxr_keyword">catch</strong> (IOException e) {
+<a name="1338" href="#1338">1338</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
+<a name="1339" href="#1339">1339</a> <span class="jxr_string">"loadCheckpoint: Failed to load global statistics"</span>, e);
+<a name="1340" href="#1340">1340</a> }
+<a name="1341" href="#1341">1341</a>
+<a name="1342" href="#1342">1342</a> <em class="jxr_comment">// Communication service needs to setup the connections prior to</em>
+<a name="1343" href="#1343">1343</a> <em class="jxr_comment">// processing vertices</em>
+<a name="1344" href="#1344">1344</a> commService.setup();
+<a name="1345" href="#1345">1345</a> }
+<a name="1346" href="#1346">1346</a>
+<a name="1347" href="#1347">1347</a> <em class="jxr_javadoccomment">/**</em>
+<a name="1348" href="#1348">1348</a> <em class="jxr_javadoccomment"> * Send the worker partitions to their destination workers</em>
+<a name="1349" href="#1349">1349</a> <em class="jxr_javadoccomment"> *</em>
+<a name="1350" href="#1350">1350</a> <em class="jxr_javadoccomment"> * @param workerPartitionMap Map of worker info to the partitions stored</em>
+<a name="1351" href="#1351">1351</a> <em class="jxr_javadoccomment"> * on this worker to be sent</em>
+<a name="1352" href="#1352">1352</a> <em class="jxr_javadoccomment"> */</em>
+<a name="1353" href="#1353">1353</a> <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">void</strong> sendWorkerPartitions(
+<a name="1354" href="#1354">1354</a> Map<WorkerInfo, List<Integer>> workerPartitionMap) {
+<a name="1355" href="#1355">1355</a> List<Entry<WorkerInfo, List<Integer>>> randomEntryList =
+<a name="1356" href="#1356">1356</a> <strong class="jxr_keyword">new</strong> ArrayList<Entry<WorkerInfo, List<Integer>>>(
+<a name="1357" href="#1357">1357</a> workerPartitionMap.entrySet());
+<a name="1358" href="#1358">1358</a> Collections.shuffle(randomEntryList);
+<a name="1359" href="#1359">1359</a> <strong class="jxr_keyword">for</strong> (Entry<WorkerInfo, List<Integer>> workerPartitionList :
+<a name="1360" href="#1360">1360</a> randomEntryList) {
+<a name="1361" href="#1361">1361</a> <strong class="jxr_keyword">for</strong> (Integer partitionId : workerPartitionList.getValue()) {
+<a name="1362" href="#1362">1362</a> Partition<I, V, E, M> partition =
+<a name="1363" href="#1363">1363</a> getPartitionMap().get(partitionId);
+<a name="1364" href="#1364">1364</a> <strong class="jxr_keyword">if</strong> (partition == <strong class="jxr_keyword">null</strong>) {
+<a name="1365" href="#1365">1365</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
+<a name="1366" href="#1366">1366</a> <span class="jxr_string">"sendWorkerPartitions: Couldn't find partition "</span> +
+<a name="1367" href="#1367">1367</a> partitionId + <span class="jxr_string">" to send to "</span> +
+<a name="1368" href="#1368">1368</a> workerPartitionList.getKey());
+<a name="1369" href="#1369">1369</a> }
+<a name="1370" href="#1370">1370</a> <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
+<a name="1371" href="#1371">1371</a> LOG.info(<span class="jxr_string">"sendWorkerPartitions: Sending worker "</span> +
+<a name="1372" href="#1372">1372</a> workerPartitionList.getKey() + <span class="jxr_string">" partition "</span> +
+<a name="1373" href="#1373">1373</a> partitionId);
+<a name="1374" href="#1374">1374</a> }
+<a name="1375" href="#1375">1375</a> getGraphMapper().getGraphState().getWorkerCommunications().
+<a name="1376" href="#1376">1376</a> sendPartitionRequest(workerPartitionList.getKey(),
+<a name="1377" href="#1377">1377</a> partition);
+<a name="1378" href="#1378">1378</a> getPartitionMap().remove(partitionId);
+<a name="1379" href="#1379">1379</a> }
+<a name="1380" href="#1380">1380</a> }
+<a name="1381" href="#1381">1381</a>
+<a name="1382" href="#1382">1382</a> <strong class="jxr_keyword">try</strong> {
+<a name="1383" href="#1383">1383</a> getGraphMapper().getGraphState().getWorkerCommunications().flush();
+<a name="1384" href="#1384">1384</a> } <strong class="jxr_keyword">catch</strong> (IOException e) {
+<a name="1385" href="#1385">1385</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(<span class="jxr_string">"sendWorkerPartitions: Flush failed"</span>, e);
+<a name="1386" href="#1386">1386</a> }
+<a name="1387" href="#1387">1387</a> String myPartitionExchangeDonePath =
+<a name="1388" href="#1388">1388</a> getPartitionExchangeWorkerPath(
+<a name="1389" href="#1389">1389</a> getApplicationAttempt(), getSuperstep(), getWorkerInfo());
+<a name="1390" href="#1390">1390</a> <strong class="jxr_keyword">try</strong> {
+<a name="1391" href="#1391">1391</a> getZkExt().createExt(myPartitionExchangeDonePath,
+<a name="1392" href="#1392">1392</a> <strong class="jxr_keyword">null</strong>,
+<a name="1393" href="#1393">1393</a> Ids.OPEN_ACL_UNSAFE,
+<a name="1394" href="#1394">1394</a> CreateMode.PERSISTENT,
+<a name="1395" href="#1395">1395</a> <strong class="jxr_keyword">true</strong>);
+<a name="1396" href="#1396">1396</a> } <strong class="jxr_keyword">catch</strong> (KeeperException e) {
+<a name="1397" href="#1397">1397</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
+<a name="1398" href="#1398">1398</a> <span class="jxr_string">"sendWorkerPartitions: KeeperException to create "</span> +
+<a name="1399" href="#1399">1399</a> myPartitionExchangeDonePath, e);
+<a name="1400" href="#1400">1400</a> } <strong class="jxr_keyword">catch</strong> (InterruptedException e) {
+<a name="1401" href="#1401">1401</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
+<a name="1402" href="#1402">1402</a> <span class="jxr_string">"sendWorkerPartitions: InterruptedException to create "</span> +
+<a name="1403" href="#1403">1403</a> myPartitionExchangeDonePath, e);
+<a name="1404" href="#1404">1404</a> }
+<a name="1405" href="#1405">1405</a> <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
+<a name="1406" href="#1406">1406</a> LOG.info(<span class="jxr_string">"sendWorkerPartitions: Done sending all my partitions."</span>);
<a name="1407" href="#1407">1407</a> }
-<a name="1408" href="#1408">1408</a>
-<a name="1409" href="#1409">1409</a> String vertexExchangePath =
-<a name="1410" href="#1410">1410</a> getPartitionExchangePath(getApplicationAttempt(), getSuperstep());
-<a name="1411" href="#1411">1411</a> List<String> workerDoneList;
-<a name="1412" href="#1412">1412</a> <strong class="jxr_keyword">try</strong> {
-<a name="1413" href="#1413">1413</a> <strong class="jxr_keyword">while</strong> (<strong class="jxr_keyword">true</strong>) {
-<a name="1414" href="#1414">1414</a> workerDoneList = getZkExt().getChildrenExt(
-<a name="1415" href="#1415">1415</a> vertexExchangePath, <strong class="jxr_keyword">true</strong>, false, false);
-<a name="1416" href="#1416">1416</a> workerIdSet.removeAll(workerDoneList);
-<a name="1417" href="#1417">1417</a> <strong class="jxr_keyword">if</strong> (workerIdSet.isEmpty()) {
-<a name="1418" href="#1418">1418</a> <strong class="jxr_keyword">break</strong>;
-<a name="1419" href="#1419">1419</a> }
-<a name="1420" href="#1420">1420</a> <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
-<a name="1421" href="#1421">1421</a> LOG.info(<span class="jxr_string">"exchangeVertexPartitions: Waiting for workers "</span> +
-<a name="1422" href="#1422">1422</a> workerIdSet);
-<a name="1423" href="#1423">1423</a> }
-<a name="1424" href="#1424">1424</a> getPartitionExchangeChildrenChangedEvent().waitForever();
-<a name="1425" href="#1425">1425</a> getPartitionExchangeChildrenChangedEvent().reset();
-<a name="1426" href="#1426">1426</a> }
-<a name="1427" href="#1427">1427</a> } <strong class="jxr_keyword">catch</strong> (KeeperException e) {
-<a name="1428" href="#1428">1428</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> RuntimeException(e);
-<a name="1429" href="#1429">1429</a> } <strong class="jxr_keyword">catch</strong> (InterruptedException e) {
-<a name="1430" href="#1430">1430</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> RuntimeException(e);
-<a name="1431" href="#1431">1431</a> }
-<a name="1432" href="#1432">1432</a>
-<a name="1433" href="#1433">1433</a> <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
-<a name="1434" href="#1434">1434</a> LOG.info(<span class="jxr_string">"exchangeVertexPartitions: Done with exchange."</span>);
-<a name="1435" href="#1435">1435</a> }
-<a name="1436" href="#1436">1436</a>
-<a name="1437" href="#1437">1437</a> <em class="jxr_comment">// Add the partitions sent earlier</em>
-<a name="1438" href="#1438">1438</a> movePartitionsToWorker(commService);
-<a name="1439" href="#1439">1439</a> }
-<a name="1440" href="#1440">1440</a>
-<a name="1441" href="#1441">1441</a> <em class="jxr_javadoccomment">/**</em>
-<a name="1442" href="#1442">1442</a> <em class="jxr_javadoccomment"> * Partitions that are exchanged need to be moved from the communication</em>
-<a name="1443" href="#1443">1443</a> <em class="jxr_javadoccomment"> * service to the worker.</em>
-<a name="1444" href="#1444">1444</a> <em class="jxr_javadoccomment"> *</em>
-<a name="1445" href="#1445">1445</a> <em class="jxr_javadoccomment"> * @param commService Communication service where the partitions are</em>
-<a name="1446" href="#1446">1446</a> <em class="jxr_javadoccomment"> * temporarily stored.</em>
-<a name="1447" href="#1447">1447</a> <em class="jxr_javadoccomment"> */</em>
-<a name="1448" href="#1448">1448</a> <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">void</strong> movePartitionsToWorker(
-<a name="1449" href="#1449">1449</a> WorkerServer<I, V, E, M> commService) {
-<a name="1450" href="#1450">1450</a> Map<Integer, Collection<Vertex<I, V, E, M>>> inPartitionVertexMap =
-<a name="1451" href="#1451">1451</a> commService.getInPartitionVertexMap();
-<a name="1452" href="#1452">1452</a> <strong class="jxr_keyword">synchronized</strong> (inPartitionVertexMap) {
-<a name="1453" href="#1453">1453</a> <strong class="jxr_keyword">for</strong> (Entry<Integer, Collection<Vertex<I, V, E, M>>> entry :
-<a name="1454" href="#1454">1454</a> inPartitionVertexMap.entrySet()) {
-<a name="1455" href="#1455">1455</a> <strong class="jxr_keyword">if</strong> (getPartitionMap().containsKey(entry.getKey())) {
-<a name="1456" href="#1456">1456</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
-<a name="1457" href="#1457">1457</a> <span class="jxr_string">"moveVerticesToWorker: Already has partition "</span> +
-<a name="1458" href="#1458">1458</a> getPartitionMap().get(entry.getKey()) +
-<a name="1459" href="#1459">1459</a> <span class="jxr_string">", cannot receive vertex list of size "</span> +
-<a name="1460" href="#1460">1460</a> entry.getValue().size());
+<a name="1408" href="#1408">1408</a> }
+<a name="1409" href="#1409">1409</a>
+<a name="1410" href="#1410">1410</a> @Override
+<a name="1411" href="#1411">1411</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">final</strong> <strong class="jxr_keyword">void</strong> exchangeVertexPartitions(
+<a name="1412" href="#1412">1412</a> Collection<? <strong class="jxr_keyword">extends</strong> PartitionOwner> masterSetPartitionOwners) {
+<a name="1413" href="#1413">1413</a> <em class="jxr_comment">// 1. Fix the addresses of the partition ids if they have changed.</em>
+<a name="1414" href="#1414">1414</a> <em class="jxr_comment">// 2. Send all the partitions to their destination workers in a random</em>
+<a name="1415" href="#1415">1415</a> <em class="jxr_comment">// fashion.</em>
+<a name="1416" href="#1416">1416</a> <em class="jxr_comment">// 3. Notify completion with a ZooKeeper stamp</em>
+<a name="1417" href="#1417">1417</a> <em class="jxr_comment">// 4. Wait for all my dependencies to be done (if any)</em>
+<a name="1418" href="#1418">1418</a> <em class="jxr_comment">// 5. Add the partitions to myself.</em>
+<a name="1419" href="#1419">1419</a> <a href="../../../../org/apache/giraph/graph/partition/PartitionExchange.html">PartitionExchange</a> partitionExchange =
+<a name="1420" href="#1420">1420</a> workerGraphPartitioner.updatePartitionOwners(
+<a name="1421" href="#1421">1421</a> getWorkerInfo(), masterSetPartitionOwners, getPartitionMap());
+<a name="1422" href="#1422">1422</a> commService.fixPartitionIdToSocketAddrMap();
+<a name="1423" href="#1423">1423</a>
+<a name="1424" href="#1424">1424</a> Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap =
+<a name="1425" href="#1425">1425</a> partitionExchange.getSendWorkerPartitionMap();
+<a name="1426" href="#1426">1426</a> <strong class="jxr_keyword">if</strong> (!workerPartitionMap.isEmpty()) {
+<a name="1427" href="#1427">1427</a> sendWorkerPartitions(sendWorkerPartitionMap);
+<a name="1428" href="#1428">1428</a> }
+<a name="1429" href="#1429">1429</a>
+<a name="1430" href="#1430">1430</a> Set<WorkerInfo> myDependencyWorkerSet =
+<a name="1431" href="#1431">1431</a> partitionExchange.getMyDependencyWorkerSet();
+<a name="1432" href="#1432">1432</a> Set<String> workerIdSet = <strong class="jxr_keyword">new</strong> HashSet<String>();
+<a name="1433" href="#1433">1433</a> <strong class="jxr_keyword">for</strong> (WorkerInfo tmpWorkerInfo : myDependencyWorkerSet) {
+<a name="1434" href="#1434">1434</a> <strong class="jxr_keyword">if</strong> (!workerIdSet.add(tmpWorkerInfo.getHostnameId())) {
+<a name="1435" href="#1435">1435</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
+<a name="1436" href="#1436">1436</a> <span class="jxr_string">"exchangeVertexPartitions: Duplicate entry "</span> + tmpWorkerInfo);
+<a name="1437" href="#1437">1437</a> }
+<a name="1438" href="#1438">1438</a> }
+<a name="1439" href="#1439">1439</a> <strong class="jxr_keyword">if</strong> (myDependencyWorkerSet.isEmpty() && workerPartitionMap.isEmpty()) {
+<a name="1440" href="#1440">1440</a> <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
+<a name="1441" href="#1441">1441</a> LOG.info(<span class="jxr_string">"exchangeVertexPartitions: Nothing to exchange, "</span> +
+<a name="1442" href="#1442">1442</a> <span class="jxr_string">"exiting early"</span>);
+<a name="1443" href="#1443">1443</a> }
+<a name="1444" href="#1444">1444</a> <strong class="jxr_keyword">return</strong>;
+<a name="1445" href="#1445">1445</a> }
+<a name="1446" href="#1446">1446</a>
+<a name="1447" href="#1447">1447</a> String vertexExchangePath =
+<a name="1448" href="#1448">1448</a> getPartitionExchangePath(getApplicationAttempt(), getSuperstep());
+<a name="1449" href="#1449">1449</a> List<String> workerDoneList;
+<a name="1450" href="#1450">1450</a> <strong class="jxr_keyword">try</strong> {
+<a name="1451" href="#1451">1451</a> <strong class="jxr_keyword">while</strong> (<strong class="jxr_keyword">true</strong>) {
+<a name="1452" href="#1452">1452</a> workerDoneList = getZkExt().getChildrenExt(
+<a name="1453" href="#1453">1453</a> vertexExchangePath, <strong class="jxr_keyword">true</strong>, false, false);
+<a name="1454" href="#1454">1454</a> workerIdSet.removeAll(workerDoneList);
+<a name="1455" href="#1455">1455</a> <strong class="jxr_keyword">if</strong> (workerIdSet.isEmpty()) {
+<a name="1456" href="#1456">1456</a> <strong class="jxr_keyword">break</strong>;
+<a name="1457" href="#1457">1457</a> }
+<a name="1458" href="#1458">1458</a> <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
+<a name="1459" href="#1459">1459</a> LOG.info(<span class="jxr_string">"exchangeVertexPartitions: Waiting for workers "</span> +
+<a name="1460" href="#1460">1460</a> workerIdSet);
<a name="1461" href="#1461">1461</a> }
-<a name="1462" href="#1462">1462</a>
-<a name="1463" href="#1463">1463</a> Partition<I, V, E, M> tmpPartition =
-<a name="1464" href="#1464">1464</a> <strong class="jxr_keyword">new</strong> Partition<I, V, E, M>(getConfiguration(),
-<a name="1465" href="#1465">1465</a> entry.getKey());
-<a name="1466" href="#1466">1466</a> <strong class="jxr_keyword">synchronized</strong> (entry.getValue()) {
-<a name="1467" href="#1467">1467</a> <strong class="jxr_keyword">for</strong> (Vertex<I, V, E, M> vertex : entry.getValue()) {
-<a name="1468" href="#1468">1468</a> <strong class="jxr_keyword">if</strong> (tmpPartition.putVertex(vertex) != <strong class="jxr_keyword">null</strong>) {
-<a name="1469" href="#1469">1469</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
-<a name="1470" href="#1470">1470</a> <span class="jxr_string">"moveVerticesToWorker: Vertex "</span> + vertex +
-<a name="1471" href="#1471">1471</a> <span class="jxr_string">" already exists!"</span>);
-<a name="1472" href="#1472">1472</a> }
-<a name="1473" href="#1473">1473</a> }
-<a name="1474" href="#1474">1474</a> <strong class="jxr_keyword">if</strong> (LOG.isDebugEnabled()) {
-<a name="1475" href="#1475">1475</a> LOG.debug(<span class="jxr_string">"movePartitionsToWorker: Adding "</span> +
-<a name="1476" href="#1476">1476</a> entry.getValue().size() +
-<a name="1477" href="#1477">1477</a> <span class="jxr_string">" vertices for partition id "</span> + entry.getKey());
-<a name="1478" href="#1478">1478</a> }
-<a name="1479" href="#1479">1479</a> getPartitionMap().put(tmpPartition.getId(),
-<a name="1480" href="#1480">1480</a> tmpPartition);
-<a name="1481" href="#1481">1481</a> entry.getValue().clear();
-<a name="1482" href="#1482">1482</a> }
-<a name="1483" href="#1483">1483</a> }
-<a name="1484" href="#1484">1484</a> inPartitionVertexMap.clear();
-<a name="1485" href="#1485">1485</a> }
-<a name="1486" href="#1486">1486</a> }
-<a name="1487" href="#1487">1487</a>
-<a name="1488" href="#1488">1488</a> <em class="jxr_javadoccomment">/**</em>
-<a name="1489" href="#1489">1489</a> <em class="jxr_javadoccomment"> * Get event when the state of a partition exchange has changed.</em>
-<a name="1490" href="#1490">1490</a> <em class="jxr_javadoccomment"> *</em>
-<a name="1491" href="#1491">1491</a> <em class="jxr_javadoccomment"> * @return Event to check.</em>
-<a name="1492" href="#1492">1492</a> <em class="jxr_javadoccomment"> */</em>
-<a name="1493" href="#1493">1493</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">final</strong> <a href="../../../../org/apache/giraph/zk/BspEvent.html">BspEvent</a> getPartitionExchangeChildrenChangedEvent() {
-<a name="1494" href="#1494">1494</a> <strong class="jxr_keyword">return</strong> partitionExchangeChildrenChanged;
-<a name="1495" href="#1495">1495</a> }
-<a name="1496" href="#1496">1496</a>
-<a name="1497" href="#1497">1497</a> @Override
-<a name="1498" href="#1498">1498</a> <strong class="jxr_keyword">protected</strong> <strong class="jxr_keyword">boolean</strong> processEvent(WatchedEvent event) {
-<a name="1499" href="#1499">1499</a> <strong class="jxr_keyword">boolean</strong> foundEvent = false;
-<a name="1500" href="#1500">1500</a> <strong class="jxr_keyword">if</strong> (event.getPath().startsWith(masterJobStatePath) &&
-<a name="1501" href="#1501">1501</a> (event.getType() == EventType.NodeChildrenChanged)) {
-<a name="1502" href="#1502">1502</a> <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
-<a name="1503" href="#1503">1503</a> LOG.info(<span class="jxr_string">"processEvent: Job state changed, checking "</span> +
-<a name="1504" href="#1504">1504</a> <span class="jxr_string">"to see if it needs to restart"</span>);
-<a name="1505" href="#1505">1505</a> }
-<a name="1506" href="#1506">1506</a> JSONObject jsonObj = getJobState();
-<a name="1507" href="#1507">1507</a> <strong class="jxr_keyword">try</strong> {
-<a name="1508" href="#1508">1508</a> <strong class="jxr_keyword">if</strong> ((ApplicationState.valueOf(jsonObj.getString(JSONOBJ_STATE_KEY)) ==
-<a name="1509" href="#1509">1509</a> ApplicationState.START_SUPERSTEP) &&
-<a name="1510" href="#1510">1510</a> jsonObj.getLong(JSONOBJ_APPLICATION_ATTEMPT_KEY) !=
-<a name="1511" href="#1511">1511</a> getApplicationAttempt()) {
-<a name="1512" href="#1512">1512</a> LOG.fatal(<span class="jxr_string">"processEvent: Worker will restart "</span> +
-<a name="1513" href="#1513">1513</a> <span class="jxr_string">"from command - "</span> + jsonObj.toString());
-<a name="1514" href="#1514">1514</a> System.exit(-1);
-<a name="1515" href="#1515">1515</a> }
-<a name="1516" href="#1516">1516</a> } <strong class="jxr_keyword">catch</strong> (JSONException e) {
-<a name="1517" href="#1517">1517</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> RuntimeException(
-<a name="1518" href="#1518">1518</a> <span class="jxr_string">"processEvent: Couldn't properly get job state from "</span> +
-<a name="1519" href="#1519">1519</a> jsonObj.toString());
-<a name="1520" href="#1520">1520</a> }
-<a name="1521" href="#1521">1521</a> foundEvent = <strong class="jxr_keyword">true</strong>;
-<a name="1522" href="#1522">1522</a> } <strong class="jxr_keyword">else</strong> <strong class="jxr_keyword">if</strong> (event.getPath().contains(PARTITION_EXCHANGE_DIR) &&
-<a name="1523" href="#1523">1523</a> event.getType() == EventType.NodeChildrenChanged) {
-<a name="1524" href="#1524">1524</a> <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
-<a name="1525" href="#1525">1525</a> LOG.info(<span class="jxr_string">"processEvent : partitionExchangeChildrenChanged "</span> +
-<a name="1526" href="#1526">1526</a> <span class="jxr_string">"(at least one worker is done sending partitions)"</span>);
-<a name="1527" href="#1527">1527</a> }
-<a name="1528" href="#1528">1528</a> partitionExchangeChildrenChanged.signal();
-<a name="1529" href="#1529">1529</a> foundEvent = <strong class="jxr_keyword">true</strong>;
-<a name="1530" href="#1530">1530</a> }
-<a name="1531" href="#1531">1531</a>
-<a name="1532" href="#1532">1532</a> <strong class="jxr_keyword">return</strong> foundEvent;
+<a name="1462" href="#1462">1462</a> getPartitionExchangeChildrenChangedEvent().waitForever();
+<a name="1463" href="#1463">1463</a> getPartitionExchangeChildrenChangedEvent().reset();
+<a name="1464" href="#1464">1464</a> }
+<a name="1465" href="#1465">1465</a> } <strong class="jxr_keyword">catch</strong> (KeeperException e) {
+<a name="1466" href="#1466">1466</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> RuntimeException(e);
+<a name="1467" href="#1467">1467</a> } <strong class="jxr_keyword">catch</strong> (InterruptedException e) {
+<a name="1468" href="#1468">1468</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> RuntimeException(e);
+<a name="1469" href="#1469">1469</a> }
+<a name="1470" href="#1470">1470</a>
+<a name="1471" href="#1471">1471</a> <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
+<a name="1472" href="#1472">1472</a> LOG.info(<span class="jxr_string">"exchangeVertexPartitions: Done with exchange."</span>);
+<a name="1473" href="#1473">1473</a> }
+<a name="1474" href="#1474">1474</a>
+<a name="1475" href="#1475">1475</a> <em class="jxr_comment">// Add the partitions sent earlier</em>
+<a name="1476" href="#1476">1476</a> movePartitionsToWorker(commService);
+<a name="1477" href="#1477">1477</a> }
+<a name="1478" href="#1478">1478</a>
+<a name="1479" href="#1479">1479</a> <em class="jxr_javadoccomment">/**</em>
+<a name="1480" href="#1480">1480</a> <em class="jxr_javadoccomment"> * Partitions that are exchanged need to be moved from the communication</em>
+<a name="1481" href="#1481">1481</a> <em class="jxr_javadoccomment"> * service to the worker.</em>
+<a name="1482" href="#1482">1482</a> <em class="jxr_javadoccomment"> *</em>
+<a name="1483" href="#1483">1483</a> <em class="jxr_javadoccomment"> * @param commService Communication service where the partitions are</em>
+<a name="1484" href="#1484">1484</a> <em class="jxr_javadoccomment"> * temporarily stored.</em>
+<a name="1485" href="#1485">1485</a> <em class="jxr_javadoccomment"> */</em>
+<a name="1486" href="#1486">1486</a> <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">void</strong> movePartitionsToWorker(
+<a name="1487" href="#1487">1487</a> WorkerServer<I, V, E, M> commService) {
+<a name="1488" href="#1488">1488</a> Map<Integer, Collection<Vertex<I, V, E, M>>> inPartitionVertexMap =
+<a name="1489" href="#1489">1489</a> commService.getInPartitionVertexMap();
+<a name="1490" href="#1490">1490</a> <strong class="jxr_keyword">synchronized</strong> (inPartitionVertexMap) {
+<a name="1491" href="#1491">1491</a> <strong class="jxr_keyword">for</strong> (Entry<Integer, Collection<Vertex<I, V, E, M>>> entry :
+<a name="1492" href="#1492">1492</a> inPartitionVertexMap.entrySet()) {
+<a name="1493" href="#1493">1493</a> <strong class="jxr_keyword">if</strong> (getPartitionMap().containsKey(entry.getKey())) {
+<a name="1494" href="#1494">1494</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
+<a name="1495" href="#1495">1495</a> <span class="jxr_string">"moveVerticesToWorker: Already has partition "</span> +
+<a name="1496" href="#1496">1496</a> getPartitionMap().get(entry.getKey()) +
+<a name="1497" href="#1497">1497</a> <span class="jxr_string">", cannot receive vertex list of size "</span> +
+<a name="1498" href="#1498">1498</a> entry.getValue().size());
+<a name="1499" href="#1499">1499</a> }
+<a name="1500" href="#1500">1500</a>
+<a name="1501" href="#1501">1501</a> Partition<I, V, E, M> tmpPartition =
+<a name="1502" href="#1502">1502</a> <strong class="jxr_keyword">new</strong> Partition<I, V, E, M>(getConfiguration(),
+<a name="1503" href="#1503">1503</a> entry.getKey());
+<a name="1504" href="#1504">1504</a> <strong class="jxr_keyword">synchronized</strong> (entry.getValue()) {
+<a name="1505" href="#1505">1505</a> <strong class="jxr_keyword">for</strong> (Vertex<I, V, E, M> vertex : entry.getValue()) {
+<a name="1506" href="#1506">1506</a> <strong class="jxr_keyword">if</strong> (tmpPartition.putVertex(vertex) != <strong class="jxr_keyword">null</strong>) {
+<a name="1507" href="#1507">1507</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(
+<a name="1508" href="#1508">1508</a> <span class="jxr_string">"moveVerticesToWorker: Vertex "</span> + vertex +
+<a name="1509" href="#1509">1509</a> <span class="jxr_string">" already exists!"</span>);
+<a name="1510" href="#1510">1510</a> }
+<a name="1511" href="#1511">1511</a> }
+<a name="1512" href="#1512">1512</a> <strong class="jxr_keyword">if</strong> (LOG.isDebugEnabled()) {
+<a name="1513" href="#1513">1513</a> LOG.debug(<span class="jxr_string">"movePartitionsToWorker: Adding "</span> +
+<a name="1514" href="#1514">1514</a> entry.getValue().size() +
+<a name="1515" href="#1515">1515</a> <span class="jxr_string">" vertices for partition id "</span> + entry.getKey());
+<a name="1516" href="#1516">1516</a> }
+<a name="1517" href="#1517">1517</a> getPartitionMap().put(tmpPartition.getId(),
+<a name="1518" href="#1518">1518</a> tmpPartition);
+<a name="1519" href="#1519">1519</a> entry.getValue().clear();
+<a name="1520" href="#1520">1520</a> }
+<a name="1521" href="#1521">1521</a> }
+<a name="1522" href="#1522">1522</a> inPartitionVertexMap.clear();
+<a name="1523" href="#1523">1523</a> }
+<a name="1524" href="#1524">1524</a> }
+<a name="1525" href="#1525">1525</a>
+<a name="1526" href="#1526">1526</a> <em class="jxr_javadoccomment">/**</em>
+<a name="1527" href="#1527">1527</a> <em class="jxr_javadoccomment"> * Get event when the state of a partition exchange has changed.</em>
+<a name="1528" href="#1528">1528</a> <em class="jxr_javadoccomment"> *</em>
+<a name="1529" href="#1529">1529</a> <em class="jxr_javadoccomment"> * @return Event to check.</em>
+<a name="1530" href="#1530">1530</a> <em class="jxr_javadoccomment"> */</em>
+<a name="1531" href="#1531">1531</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">final</strong> <a href="../../../../org/apache/giraph/zk/BspEvent.html">BspEvent</a> getPartitionExchangeChildrenChangedEvent() {
+<a name="1532" href="#1532">1532</a> <strong class="jxr_keyword">return</strong> partitionExchangeChildrenChanged;
<a name="1533" href="#1533">1533</a> }
<a name="1534" href="#1534">1534</a>
<a name="1535" href="#1535">1535</a> @Override
-<a name="1536" href="#1536">1536</a> <strong class="jxr_keyword">public</strong> <a href="../../../../org/apache/giraph/graph/WorkerInfo.html">WorkerInfo</a> getWorkerInfo() {
-<a name="1537" href="#1537">1537</a> <strong class="jxr_keyword">return</strong> workerInfo;
-<a name="1538" href="#1538">1538</a> }
-<a name="1539" href="#1539">1539</a>
-<a name="1540" href="#1540">1540</a> @Override
-<a name="1541" href="#1541">1541</a> <strong class="jxr_keyword">public</strong> Map<Integer, Partition<I, V, E, M>> getPartitionMap() {
-<a name="1542" href="#1542">1542</a> <strong class="jxr_keyword">return</strong> workerPartitionMap;
-<a name="1543" href="#1543">1543</a> }
-<a name="1544" href="#1544">1544</a>
-<a name="1545" href="#1545">1545</a> @Override
-<a name="1546" href="#1546">1546</a> <strong class="jxr_keyword">public</strong> Collection<? <strong class="jxr_keyword">extends</strong> PartitionOwner> getPartitionOwners() {
-<a name="1547" href="#1547">1547</a> <strong class="jxr_keyword">return</strong> workerGraphPartitioner.getPartitionOwners();
-<a name="1548" href="#1548">1548</a> }
-<a name="1549" href="#1549">1549</a>
-<a name="1550" href="#1550">1550</a> @Override
-<a name="1551" href="#1551">1551</a> <strong class="jxr_keyword">public</strong> <a href="../../../../org/apache/giraph/graph/partition/PartitionOwner.html">PartitionOwner</a> getVertexPartitionOwner(I vertexId) {
-<a name="1552" href="#1552">1552</a> <strong class="jxr_keyword">return</strong> workerGraphPartitioner.getPartitionOwner(vertexId);
-<a name="1553" href="#1553">1553</a> }
-<a name="1554" href="#1554">1554</a>
-<a name="1555" href="#1555">1555</a> <em class="jxr_javadoccomment">/**</em>
-<a name="1556" href="#1556">1556</a> <em class="jxr_javadoccomment"> * Get the partition for a vertex index.</em>
-<a name="1557" href="#1557">1557</a> <em class="jxr_javadoccomment"> *</em>
-<a name="1558" href="#1558">1558</a> <em class="jxr_javadoccomment"> * @param vertexId Vertex index to search for the partition.</em>
-<a name="1559" href="#1559">1559</a> <em class="jxr_javadoccomment"> * @return Partition that owns this vertex.</em>
-<a name="1560" href="#1560">1560</a> <em class="jxr_javadoccomment"> */</em>
-<a name="1561" href="#1561">1561</a> <strong class="jxr_keyword">public</strong> Partition<I, V, E, M> getPartition(I vertexId) {
-<a name="1562" href="#1562">1562</a> <a href="../../../../org/apache/giraph/graph/partition/PartitionOwner.html">PartitionOwner</a> partitionOwner = getVertexPartitionOwner(vertexId);
-<a name="1563" href="#1563">1563</a> <strong class="jxr_keyword">return</strong> workerPartitionMap.get(partitionOwner.getPartitionId());
-<a name="1564" href="#1564">1564</a> }
-<a name="1565" href="#1565">1565</a>
-<a name="1566" href="#1566">1566</a> @Override
-<a name="1567" href="#1567">1567</a> <strong class="jxr_keyword">public</strong> Vertex<I, V, E, M> getVertex(I vertexId) {
-<a name="1568" href="#1568">1568</a> <a href="../../../../org/apache/giraph/graph/partition/PartitionOwner.html">PartitionOwner</a> partitionOwner = getVertexPartitionOwner(vertexId);
-<a name="1569" href="#1569">1569</a> <strong class="jxr_keyword">if</strong> (workerPartitionMap.containsKey(partitionOwner.getPartitionId())) {
-<a name="1570" href="#1570">1570</a> <strong class="jxr_keyword">return</strong> workerPartitionMap.get(
-<a name="1571" href="#1571">1571</a> partitionOwner.getPartitionId()).getVertex(vertexId);
-<a name="1572" href="#1572">1572</a> } <strong class="jxr_keyword">else</strong> {
-<a name="1573" href="#1573">1573</a> <strong class="jxr_keyword">return</strong> <strong class="jxr_keyword">null</strong>;
-<a name="1574" href="#1574">1574</a> }
-<a name="1575" href="#1575">1575</a> }
-<a name="1576" href="#1576">1576</a>
-<a name="1577" href="#1577">1577</a> @Override
-<a name="1578" href="#1578">1578</a> <strong class="jxr_keyword">public</strong> ServerData<I, V, E, M> getServerData() {
-<a name="1579" href="#1579">1579</a> <strong class="jxr_keyword">return</strong> commService.getServerData();
-<a name="1580" href="#1580">1580</a> }
-<a name="1581" href="#1581">1581</a> }
+<a name="1536" href="#1536">1536</a> <strong class="jxr_keyword">protected</strong> <strong class="jxr_keyword">boolean</strong> processEvent(WatchedEvent event) {
+<a name="1537" href="#1537">1537</a> <strong class="jxr_keyword">boolean</strong> foundEvent = false;
+<a name="1538" href="#1538">1538</a> <strong class="jxr_keyword">if</strong> (event.getPath().startsWith(masterJobStatePath) &&
+<a name="1539" href="#1539">1539</a> (event.getType() == EventType.NodeChildrenChanged)) {
+<a name="1540" href="#1540">1540</a> <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
+<a name="1541" href="#1541">1541</a> LOG.info(<span class="jxr_string">"processEvent: Job state changed, checking "</span> +
+<a name="1542" href="#1542">1542</a> <span class="jxr_string">"to see if it needs to restart"</span>);
+<a name="1543" href="#1543">1543</a> }
+<a name="1544" href="#1544">1544</a> JSONObject jsonObj = getJobState();
+<a name="1545" href="#1545">1545</a> <strong class="jxr_keyword">try</strong> {
+<a name="1546" href="#1546">1546</a> <strong class="jxr_keyword">if</strong> ((ApplicationState.valueOf(jsonObj.getString(JSONOBJ_STATE_KEY)) ==
+<a name="1547" href="#1547">1547</a> ApplicationState.START_SUPERSTEP) &&
+<a name="1548" href="#1548">1548</a> jsonObj.getLong(JSONOBJ_APPLICATION_ATTEMPT_KEY) !=
+<a name="1549" href="#1549">1549</a> getApplicationAttempt()) {
+<a name="1550" href="#1550">1550</a> LOG.fatal(<span class="jxr_string">"processEvent: Worker will restart "</span> +
+<a name="1551" href="#1551">1551</a> <span class="jxr_string">"from command - "</span> + jsonObj.toString());
+<a name="1552" href="#1552">1552</a> System.exit(-1);
+<a name="1553" href="#1553">1553</a> }
+<a name="1554" href="#1554">1554</a> } <strong class="jxr_keyword">catch</strong> (JSONException e) {
+<a name="1555" href="#1555">1555</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> RuntimeException(
+<a name="1556" href="#1556">1556</a> <span class="jxr_string">"processEvent: Couldn't properly get job state from "</span> +
+<a name="1557" href="#1557">1557</a> jsonObj.toString());
+<a name="1558" href="#1558">1558</a> }
+<a name="1559" href="#1559">1559</a> foundEvent = <strong class="jxr_keyword">true</strong>;
+<a name="1560" href="#1560">1560</a> } <strong class="jxr_keyword">else</strong> <strong class="jxr_keyword">if</strong> (event.getPath().contains(PARTITION_EXCHANGE_DIR) &&
+<a name="1561" href="#1561">1561</a> event.getType() == EventType.NodeChildrenChanged) {
+<a name="1562" href="#1562">1562</a> <strong class="jxr_keyword">if</strong> (LOG.isInfoEnabled()) {
+<a name="1563" href="#1563">1563</a> LOG.info(<span class="jxr_string">"processEvent : partitionExchangeChildrenChanged "</span> +
+<a name="1564" href="#1564">1564</a> <span class="jxr_string">"(at least one worker is done sending partitions)"</span>);
+<a name="1565" href="#1565">1565</a> }
+<a name="1566" href="#1566">1566</a> partitionExchangeChildrenChanged.signal();
+<a name="1567" href="#1567">1567</a> foundEvent = <strong class="jxr_keyword">true</strong>;
+<a name="1568" href="#1568">1568</a> }
+<a name="1569" href="#1569">1569</a>
+<a name="1570" href="#1570">1570</a> <strong class="jxr_keyword">return</strong> foundEvent;
+<a name="1571" href="#1571">1571</a> }
+<a name="1572" href="#1572">1572</a>
+<a name="1573" href="#1573">1573</a> @Override
+<a name="1574" href="#1574">1574</a> <strong class="jxr_keyword">public</strong> <a href="../../../../org/apache/giraph/graph/WorkerInfo.html">WorkerInfo</a> getWorkerInfo() {
+<a name="1575" href="#1575">1575</a> <strong class="jxr_keyword">return</strong> workerInfo;
+<a name="1576" href="#1576">1576</a> }
+<a name="1577" href="#1577">1577</a>
+<a name="1578" href="#1578">1578</a> @Override
+<a name="1579" href="#1579">1579</a> <strong class="jxr_keyword">public</strong> Map<Integer, Partition<I, V, E, M>> getPartitionMap() {
+<a name="1580" href="#1580">1580</a> <strong class="jxr_keyword">return</strong> workerPartitionMap;
+<a name="1581" href="#1581">1581</a> }
+<a name="1582" href="#1582">1582</a>
+<a name="1583" href="#1583">1583</a> @Override
+<a name="1584" href="#1584">1584</a> <strong class="jxr_keyword">public</strong> Collection<? <strong class="jxr_keyword">extends</strong> PartitionOwner> getPartitionOwners() {
+<a name="1585" href="#1585">1585</a> <strong class="jxr_keyword">return</strong> workerGraphPartitioner.getPartitionOwners();
+<a name="1586" href="#1586">1586</a> }
+<a name="1587" href="#1587">1587</a>
+<a name="1588" href="#1588">1588</a> @Override
+<a name="1589" href="#1589">1589</a> <strong class="jxr_keyword">public</strong> <a href="../../../../org/apache/giraph/graph/partition/PartitionOwner.html">PartitionOwner</a> getVertexPartitionOwner(I vertexId) {
+<a name="1590" href="#1590">1590</a> <strong class="jxr_keyword">return</strong> workerGraphPartitioner.getPartitionOwner(vertexId);
+<a name="1591" href="#1591">1591</a> }
+<a name="1592" href="#1592">1592</a>
+<a name="1593" href="#1593">1593</a> <em class="jxr_javadoccomment">/**</em>
+<a name="1594" href="#1594">1594</a> <em class="jxr_javadoccomment"> * Get the partition for a vertex index.</em>
+<a name="1595" href="#1595">1595</a> <em class="jxr_javadoccomment"> *</em>
+<a name="1596" href="#1596">1596</a> <em class="jxr_javadoccomment"> * @param vertexId Vertex index to search for the partition.</em>
+<a name="1597" href="#1597">1597</a> <em class="jxr_javadoccomment"> * @return Partition that owns this vertex.</em>
+<a name="1598" href="#1598">1598</a> <em class="jxr_javadoccomment"> */</em>
+<a name="1599" href="#1599">1599</a> <strong class="jxr_keyword">public</strong> Partition<I, V, E, M> getPartition(I vertexId) {
+<a name="1600" href="#1600">1600</a> <a href="../../../../org/apache/giraph/graph/partition/PartitionOwner.html">PartitionOwner</a> partitionOwner = getVertexPartitionOwner(vertexId);
+<a name="1601" href="#1601">1601</a> <strong class="jxr_keyword">return</strong> workerPartitionMap.get(partitionOwner.getPartitionId());
+<a name="1602" href="#1602">1602</a> }
+<a name="1603" href="#1603">1603</a>
+<a name="1604" href="#1604">1604</a> @Override
+<a name="1605" href="#1605">1605</a> <strong class="jxr_keyword">public</strong> Vertex<I, V, E, M> getVertex(I vertexId) {
+<a name="1606" href="#1606">1606</a> <a href="../../../../org/apache/giraph/graph/partition/PartitionOwner.html">PartitionOwner</a> partitionOwner = getVertexPartitionOwner(vertexId);
+<a name="1607" href="#1607">1607</a> <strong class="jxr_keyword">if</strong> (workerPartitionMap.containsKey(partitionOwner.getPartitionId())) {
+<a name="1608" href="#1608">1608</a> <strong class="jxr_keyword">return</strong> workerPartitionMap.get(
+<a name="1609" href="#1609">1609</a> partitionOwner.getPartitionId()).getVertex(vertexId);
+<a name="1610" href="#1610">1610</a> } <strong class="jxr_keyword">else</strong> {
+<a name="1611" href="#1611">1611</a> <strong class="jxr_keyword">return</strong> <strong class="jxr_keyword">null</strong>;
+<a name="1612" href="#1612">1612</a> }
+<a name="1613" href="#1613">1613</a> }
+<a name="1614" href="#1614">1614</a>
+<a name="1615" href="#1615">1615</a> @Override
+<a name="1616" href="#1616">1616</a> <strong class="jxr_keyword">public</strong> ServerData<I, V, E, M> getServerData() {
+<a name="1617" href="#1617">1617</a> <strong class="jxr_keyword">return</strong> commService.getServerData();
+<a name="1618" href="#1618">1618</a> }
+<a name="1619" href="#1619">1619</a> }
</pre>
<hr/><div id="footer">This page was automatically generated by <a href="http://maven.apache.org/">Maven</a></div></body>
</html>