You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/10/03 04:57:51 UTC
svn commit: r1393264 - in /giraph/trunk: ./
src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/graph/
src/test/java/org/apache/giraph/ src/test/java/org/apache/giraph/graph/
Author: aching
Date: Wed Oct 3 02:57:50 2012
New Revision: 1393264
URL: http://svn.apache.org/viewvc?rev=1393264&view=rev
Log:
GIRAPH-293: Should aggregators be checkpointed?
Added:
giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorHandler.java
giraph/trunk/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java
giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java
giraph/trunk/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java
Removed:
giraph/trunk/src/test/java/org/apache/giraph/TestAggregatorsHandling.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1393264&r1=1393263&r2=1393264&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Wed Oct 3 02:57:50 2012
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-293: Should aggregators be checkpointed? (majakabiljo via
+ aching)
+
GIRAPH-355: Partition.readFields crashes. (maja via aching)
GIRAPH-354: Giraph Formats should use hcatalog-core. (nitayj via
Modified: giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java?rev=1393264&r1=1393263&r2=1393264&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java Wed Oct 3 02:57:50 2012
@@ -37,7 +37,7 @@ import org.apache.zookeeper.KeeperExcept
@SuppressWarnings("rawtypes")
public interface CentralizedServiceMaster<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable> extends
- CentralizedService<I, V, E, M>, MasterAggregatorUsage {
+ CentralizedService<I, V, E, M> {
/**
* Become the master.
* @return true if became the master, false if the application is done.
@@ -92,4 +92,11 @@ public interface CentralizedServiceMaste
void setJobState(ApplicationState state,
long applicationAttempt,
long desiredSuperstep);
+
+ /**
+ * Get master aggregator usage
+ *
+ * @return Master aggregator usage
+ */
+ MasterAggregatorUsage getAggregatorUsage();
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1393264&r1=1393263&r2=1393264&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java Wed Oct 3 02:57:50 2012
@@ -48,7 +48,7 @@ import org.apache.giraph.graph.WorkerCon
@SuppressWarnings("rawtypes")
public interface CentralizedServiceWorker<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
- extends CentralizedService<I, V, E, M>, WorkerAggregatorUsage {
+ extends CentralizedService<I, V, E, M> {
/**
* Get the worker information
*
@@ -201,4 +201,11 @@ public interface CentralizedServiceWorke
* @return Server data
*/
ServerData<I, V, E, M> getServerData();
+
+ /**
+ * Get worker aggregator usage
+ *
+ * @return Worker aggregator usage
+ */
+ WorkerAggregatorUsage getAggregatorUsage();
}
Added: giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorHandler.java?rev=1393264&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorHandler.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorHandler.java Wed Oct 3 02:57:50 2012
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+/**
+ * Class which handles all the actions with aggregators
+ */
+public abstract class AggregatorHandler {
+ /** Map of aggregators */
+ private final Map<String, AggregatorWrapper<Writable>> aggregatorMap;
+
+ /**
+ * Default constructor
+ */
+ protected AggregatorHandler() {
+ aggregatorMap = Maps.newHashMap();
+ }
+
+ /**
+ * Get value of an aggregator.
+ *
+ * @param name Name of aggregator
+ * @param <A> Aggregated value
+ * @return Value of the aggregator
+ */
+ public <A extends Writable> A getAggregatedValue(String name) {
+ AggregatorWrapper<? extends Writable> aggregator = getAggregator(name);
+ if (aggregator == null) {
+ return null;
+ } else {
+ return (A) aggregator.getPreviousAggregatedValue();
+ }
+ }
+
+ /**
+ * Get aggregator by name.
+ *
+ * @param name Name of aggregator
+ * @return Aggregator or null when not registered
+ */
+ protected AggregatorWrapper<Writable> getAggregator(String name) {
+ return aggregatorMap.get(name);
+ }
+
+ /**
+ * Get map of aggregators
+ *
+ * @return Aggregators map
+ */
+ protected Map<String, AggregatorWrapper<Writable>> getAggregatorMap() {
+ return aggregatorMap;
+ }
+
+ /**
+ * Register an aggregator with name and class.
+ *
+ * @param <A> Aggregator type
+ * @param name Name of the aggregator
+ * @param aggregatorClass Class of the aggregator
+ * @param persistent False iff aggregator should be reset at the end of
+ * every super step
+ * @return Aggregator
+ * @throws IllegalAccessException
+ * @throws InstantiationException
+ */
+ protected <A extends Writable> AggregatorWrapper<A> registerAggregator(
+ String name, Class<? extends Aggregator<A>> aggregatorClass,
+ boolean persistent) throws InstantiationException,
+ IllegalAccessException {
+ if (getAggregator(name) != null) {
+ return null;
+ }
+ AggregatorWrapper<A> aggregator =
+ new AggregatorWrapper<A>(aggregatorClass, persistent);
+ AggregatorWrapper<Writable> writableAggregator =
+ (AggregatorWrapper<Writable>) aggregator;
+ aggregatorMap.put(name, writableAggregator);
+ return aggregator;
+ }
+
+ /**
+ * Register an aggregator with name and className.
+ *
+ * @param <A> Aggregator type
+ * @param name Name of the aggregator
+ * @param aggregatorClassName Name of the aggregator class
+ * @param persistent False iff aggregator should be reset at the end of
+ * every super step
+ * @return Aggregator
+ */
+ protected <A extends Writable> AggregatorWrapper<A> registerAggregator(
+ String name, String aggregatorClassName, boolean persistent) {
+ AggregatorWrapper<Writable> aggregatorWrapper = getAggregator(name);
+ if (aggregatorWrapper == null) {
+ try {
+ Class<? extends Aggregator<Writable>> aggregatorClass =
+ (Class<? extends Aggregator<Writable>>)
+ Class.forName(aggregatorClassName);
+ aggregatorWrapper =
+ registerAggregator(name, aggregatorClass, persistent);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException("Failed to create aggregator " +
+ name + " of class " + aggregatorClassName +
+ " with ClassNotFoundException", e);
+ } catch (InstantiationException e) {
+ throw new IllegalStateException("Failed to create aggregator " +
+ name + " of class " + aggregatorClassName +
+ " with InstantiationException", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException("Failed to create aggregator " +
+ name + " of class " + aggregatorClassName +
+ " with IllegalAccessException", e);
+ }
+ }
+ return (AggregatorWrapper<A>) aggregatorWrapper;
+ }
+}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java?rev=1393264&r1=1393263&r2=1393264&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java Wed Oct 3 02:57:50 2012
@@ -49,8 +49,6 @@ import java.security.InvalidParameterExc
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
/**
* Zookeeper-based implementation of {@link CentralizedService}.
@@ -161,13 +159,6 @@ public abstract class BspService<I exten
/** JSON superstep key */
public static final String JSONOBJ_SUPERSTEP_KEY =
"_superstepKey";
- /** Aggregator name key */
- public static final String AGGREGATOR_NAME_KEY = "_aggregatorNameKey";
- /** Aggregator class name key */
- public static final String AGGREGATOR_CLASS_NAME_KEY =
- "_aggregatorClassNameKey";
- /** Aggregator value key */
- public static final String AGGREGATOR_VALUE_KEY = "_aggregatorValueKey";
/** Suffix denotes a worker */
public static final String WORKER_SUFFIX = "_worker";
/** Suffix denotes a master */
@@ -266,9 +257,6 @@ public abstract class BspService<I exten
private final FileSystem fs;
/** Checkpoint frequency */
private final int checkpointFrequency;
- /** Map of aggregators */
- private Map<String, AggregatorWrapper<Writable>> aggregatorMap =
- new TreeMap<String, AggregatorWrapper<Writable>>();
/**
* Constructor.
@@ -903,71 +891,6 @@ public abstract class BspService<I exten
}
/**
- * Register an aggregator with name.
- *
- * @param <A> Aggregator type
- * @param name Name of the aggregator
- * @param aggregatorClass Class of the aggregator
- * @param persistent False iff aggregator should be reset at the end of
- * every super step
- * @return Aggregator
- * @throws IllegalAccessException
- * @throws InstantiationException
- */
- protected <A extends Writable> AggregatorWrapper<A> registerAggregator(
- String name, Class<? extends Aggregator<A>> aggregatorClass,
- boolean persistent) throws InstantiationException,
- IllegalAccessException {
- if (aggregatorMap.get(name) != null) {
- return null;
- }
- AggregatorWrapper<A> aggregator =
- new AggregatorWrapper<A>(aggregatorClass, persistent);
- AggregatorWrapper<Writable> writableAggregator =
- (AggregatorWrapper<Writable>) aggregator;
- aggregatorMap.put(name, writableAggregator);
- if (LOG.isInfoEnabled()) {
- LOG.info("registerAggregator: registered " + name);
- }
- return aggregator;
- }
-
- /**
- * Get aggregator by name.
- *
- * @param name Name of aggregator
- * @return Aggregator or null when not registered
- */
- protected AggregatorWrapper<? extends Writable> getAggregator(String name) {
- return aggregatorMap.get(name);
- }
-
- /**
- * Get value of an aggregator.
- *
- * @param name Name of aggregator
- * @param <A> Aggregated value
- * @return Value of the aggregator
- */
- public <A extends Writable> A getAggregatedValue(String name) {
- AggregatorWrapper<? extends Writable> aggregator = getAggregator(name);
- if (aggregator == null) {
- return null;
- } else {
- return (A) aggregator.getPreviousAggregatedValue();
- }
- }
-
- /**
- * Get the aggregator map.
- *
- * @return Map of aggregator names to aggregator
- */
- protected Map<String, AggregatorWrapper<Writable>> getAggregatorMap() {
- return aggregatorMap;
- }
-
- /**
* Register a BspEvent. Ensure that it will be signaled
* by catastrophic failure so that threads waiting on an event signal
* will be unblocked.
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1393264&r1=1393263&r2=1393264&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Wed Oct 3 02:57:50 2012
@@ -58,13 +58,11 @@ import org.json.JSONObject;
import net.iharder.Base64;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -72,16 +70,12 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-
/**
* ZooKeeper-based implementation of {@link CentralizedServiceMaster}.
*
@@ -147,8 +141,8 @@ public class BspServiceMaster<I extends
/** All the partition stats from the last superstep */
private final List<PartitionStats> allPartitionStatsList =
new ArrayList<PartitionStats>();
- /** Aggregator writer */
- private AggregatorWriter aggregatorWriter;
+ /** Handler for aggregators */
+ private MasterAggregatorHandler aggregatorHandler;
/** Master class */
private MasterCompute masterCompute;
/** Communication service */
@@ -581,6 +575,11 @@ public class BspServiceMaster<I extends
return splitList.size();
}
+ @Override
+ public MasterAggregatorUsage getAggregatorUsage() {
+ return aggregatorHandler;
+ }
+
/**
* Read the finalized checkpoint file and associated metadata files for the
* checkpoint. Modifies the {@link PartitionOwner} objects to get the
@@ -615,37 +614,8 @@ public class BspServiceMaster<I extends
validMetadataPathList.add(new Path(metadataFilePath));
}
- // Set the merged aggregator data if it exists.
- int aggregatorDataSize = finalizedStream.readInt();
- if (aggregatorDataSize > 0) {
- byte [] aggregatorZkData = new byte[aggregatorDataSize];
- int actualDataRead =
- finalizedStream.read(aggregatorZkData, 0, aggregatorDataSize);
- if (actualDataRead != aggregatorDataSize) {
- throw new RuntimeException(
- "prepareCheckpointRestart: Only read " + actualDataRead +
- " of " + aggregatorDataSize + " aggregator bytes from " +
- finalizedCheckpointPath);
- }
- String mergedAggregatorPath =
- getMergedAggregatorPath(getApplicationAttempt(), superstep - 1);
- if (LOG.isInfoEnabled()) {
- LOG.info("prepareCheckpointRestart: Reloading merged " +
- "aggregator " + "data '" +
- Arrays.toString(aggregatorZkData) +
- "' to previous checkpoint in path " +
- mergedAggregatorPath);
- }
- if (getZkExt().exists(mergedAggregatorPath, false) == null) {
- getZkExt().createExt(mergedAggregatorPath,
- aggregatorZkData,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
- } else {
- getZkExt().setData(mergedAggregatorPath, aggregatorZkData, -1);
- }
- }
+ aggregatorHandler.readFields(finalizedStream);
+ aggregatorHandler.finishSuperstep(superstep - 1, this);
masterCompute.readFields(finalizedStream);
finalizedStream.close();
@@ -763,14 +733,8 @@ public class BspServiceMaster<I extends
getTaskPartition() -
currentMasterTaskPartitionCounter.getValue());
masterCompute = getConfiguration().createMasterCompute();
- aggregatorWriter = getConfiguration().createAggregatorWriter();
- try {
- aggregatorWriter.initialize(getContext(),
- getApplicationAttempt());
- } catch (IOException e) {
- throw new IllegalStateException("becomeMaster: " +
- "Couldn't initialize aggregatorWriter", e);
- }
+ aggregatorHandler = new MasterAggregatorHandler(getConfiguration());
+ aggregatorHandler.initialize(this);
if (getConfiguration().getUseNetty()) {
commService = new NettyMasterClientServer(
@@ -870,165 +834,6 @@ public class BspServiceMaster<I extends
}
/**
- * Get the aggregator values for a particular superstep and aggregate them.
- *
- * @param superstep superstep to check
- */
- private void collectAndProcessAggregatorValues(long superstep) {
- String workerFinishedPath =
- getWorkerFinishedPath(getApplicationAttempt(), superstep);
- List<String> hostnameIdPathList = null;
- try {
- hostnameIdPathList =
- getZkExt().getChildrenExt(
- workerFinishedPath, false, false, true);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: InterruptedException", e);
- }
-
- for (String hostnameIdPath : hostnameIdPathList) {
- JSONObject workerFinishedInfoObj = null;
- byte[] aggregatorArray = null;
- try {
- byte [] zkData =
- getZkExt().getData(hostnameIdPath, false, null);
- workerFinishedInfoObj = new JSONObject(new String(zkData));
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: InterruptedException",
- e);
- } catch (JSONException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: JSONException", e);
- }
- try {
- aggregatorArray = Base64.decode(workerFinishedInfoObj.getString(
- JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY));
- } catch (JSONException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("collectAndProcessAggregatorValues: " +
- "No aggregators" + " for " + hostnameIdPath);
- }
- continue;
- } catch (IOException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: IOException", e);
- }
-
- DataInputStream input =
- new DataInputStream(new ByteArrayInputStream(aggregatorArray));
- try {
- while (input.available() > 0) {
- String aggregatorName = input.readUTF();
- AggregatorWrapper<Writable> aggregator =
- getAggregatorMap().get(aggregatorName);
- if (aggregator == null) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: " +
- "Master received aggregator which isn't registered: " +
- aggregatorName);
- }
- Writable aggregatorValue = aggregator.createInitialValue();
- aggregatorValue.readFields(input);
- aggregator.aggregateCurrent(aggregatorValue);
- }
- } catch (IOException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: " +
- "IOException when reading aggregator data", e);
- }
- }
-
- if (LOG.isInfoEnabled()) {
- LOG.info("collectAndProcessAggregatorValues: Processed aggregators");
- }
-
- // prepare aggregators for master compute
- for (AggregatorWrapper<Writable> aggregator :
- getAggregatorMap().values()) {
- if (aggregator.isPersistent()) {
- aggregator.aggregateCurrent(aggregator.getPreviousAggregatedValue());
- }
- aggregator.setPreviousAggregatedValue(
- aggregator.getCurrentAggregatedValue());
- aggregator.resetCurrentAggregator();
- }
- }
-
- /**
- * Save the supplied aggregator values.
- *
- * @param superstep superstep for which to save values
- */
- private void saveAggregatorValues(long superstep) {
- Map<String, AggregatorWrapper<Writable>> aggregatorMap =
- getAggregatorMap();
-
- for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
- if (aggregator.isChanged()) {
- // if master compute changed the value, use the one he chose
- aggregator.setPreviousAggregatedValue(
- aggregator.getCurrentAggregatedValue());
- // reset aggregator for the next superstep
- aggregator.resetCurrentAggregator();
- }
- }
-
- if (aggregatorMap.size() > 0) {
- String mergedAggregatorPath =
- getMergedAggregatorPath(getApplicationAttempt(), superstep);
-
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- DataOutput output = new DataOutputStream(outputStream);
- try {
- output.writeInt(aggregatorMap.size());
- } catch (IOException e) {
- e.printStackTrace();
- }
- for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
- aggregatorMap.entrySet()) {
- try {
- output.writeUTF(entry.getKey());
- output.writeUTF(entry.getValue().getAggregatorClass().getName());
- entry.getValue().getPreviousAggregatedValue().write(output);
- } catch (IOException e) {
- throw new IllegalStateException("saveAggregatorValues: " +
- "IllegalStateException", e);
- }
- }
-
- try {
- getZkExt().createExt(mergedAggregatorPath,
- outputStream.toByteArray(),
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
- } catch (KeeperException.NodeExistsException e) {
- LOG.warn("saveAggregatorValues: " +
- mergedAggregatorPath + " already exists!");
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "saveAggregatorValues: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "saveAggregatorValues: IllegalStateException",
- e);
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("saveAggregatorValues: Finished loading " +
- mergedAggregatorPath);
- }
- }
- }
-
- /**
* Finalize the checkpoint file prefixes by taking the chosen workers and
* writing them to a finalized file. Also write out the master
* aggregated aggregator array from the previous superstep.
@@ -1056,7 +861,7 @@ public class BspServiceMaster<I extends
// <global statistics>
// <number of files>
// <used file prefix 0><used file prefix 1>...
- // <aggregator data length><aggregators as a serialized JSON byte array>
+ // <aggregator data>
// <masterCompute data>
FSDataOutputStream finalizedOutputStream =
getFs().create(finalizedCheckpointPath);
@@ -1073,16 +878,7 @@ public class BspServiceMaster<I extends
chosenWorkerInfo.getHostnameId();
finalizedOutputStream.writeUTF(chosenWorkerInfoPrefix);
}
- String mergedAggregatorPath =
- getMergedAggregatorPath(getApplicationAttempt(), superstep - 1);
- if (getZkExt().exists(mergedAggregatorPath, false) != null) {
- byte [] aggregatorZkData =
- getZkExt().getData(mergedAggregatorPath, false, null);
- finalizedOutputStream.writeInt(aggregatorZkData.length);
- finalizedOutputStream.write(aggregatorZkData);
- } else {
- finalizedOutputStream.writeInt(0);
- }
+ aggregatorHandler.write(finalizedOutputStream);
masterCompute.write(finalizedOutputStream);
finalizedOutputStream.close();
lastCheckpointedSuperstep = superstep;
@@ -1512,9 +1308,9 @@ public class BspServiceMaster<I extends
// Collect aggregator values, then run the master.compute() and
// finally save the aggregator values
- collectAndProcessAggregatorValues(getSuperstep());
+ aggregatorHandler.prepareSuperstep(getSuperstep(), this);
runMasterCompute(getSuperstep());
- saveAggregatorValues(getSuperstep());
+ aggregatorHandler.finishSuperstep(getSuperstep(), this);
// If the master is halted or all the vertices voted to halt and there
// are no more messages in the system, stop the computation
@@ -1546,28 +1342,7 @@ public class BspServiceMaster<I extends
} else {
superstepState = SuperstepState.THIS_SUPERSTEP_DONE;
}
- try {
- Iterable<Map.Entry<String, Writable>> iter =
- Iterables.transform(
- getAggregatorMap().entrySet(),
- new Function<Entry<String, AggregatorWrapper<Writable>>,
- Entry<String, Writable>>() {
- @Override
- public Entry<String, Writable> apply(
- Entry<String, AggregatorWrapper<Writable>> entry) {
- return new AbstractMap.SimpleEntry<String,
- Writable>(entry.getKey(),
- entry.getValue().getPreviousAggregatedValue());
- }
- });
- aggregatorWriter.writeAggregator(iter,
- (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE) ?
- AggregatorWriter.LAST_SUPERSTEP : getSuperstep());
- } catch (IOException e) {
- throw new IllegalStateException(
- "coordinateSuperstep: IOException while " +
- "writing aggregators data", e);
- }
+ aggregatorHandler.writeAggregators(getSuperstep(), superstepState);
return superstepState;
}
@@ -1736,7 +1511,7 @@ public class BspServiceMaster<I extends
" succeeded ");
}
}
- aggregatorWriter.close();
+ aggregatorHandler.close();
if (getConfiguration().getUseNetty()) {
commService.closeConnections();
@@ -1836,31 +1611,6 @@ public class BspServiceMaster<I extends
return foundEvent;
}
- @Override
- public <A extends Writable> boolean registerAggregator(String name,
- Class<? extends Aggregator<A>> aggregatorClass) throws
- InstantiationException, IllegalAccessException {
- return registerAggregator(name, aggregatorClass, false) != null;
- }
-
- @Override
- public <A extends Writable> boolean registerPersistentAggregator(String name,
- Class<? extends Aggregator<A>> aggregatorClass) throws
- InstantiationException, IllegalAccessException {
- return registerAggregator(name, aggregatorClass, true) != null;
- }
-
- @Override
- public <A extends Writable> void setAggregatedValue(String name, A value) {
- AggregatorWrapper<? extends Writable> aggregator = getAggregator(name);
- if (aggregator == null) {
- throw new IllegalStateException(
- "setAggregatedValue: Tried to set value of aggregator which wasn't" +
- " registered " + name);
- }
- ((AggregatorWrapper<A>) aggregator).setCurrentAggregatedValue(value);
- }
-
/**
* Set values of counters to match the ones from {@link GlobalStats}
*
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1393264&r1=1393263&r2=1393264&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Wed Oct 3 02:57:50 2012
@@ -59,7 +59,6 @@ import net.iharder.Base64;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
@@ -123,6 +122,8 @@ public class BspServiceWorker<I extends
* Partition store for worker (only used by the Hadoop RPC implementation).
*/
private final PartitionStore<I, V, E, M> workerPartitionStore;
+ /** Handler for aggregators */
+ private final WorkerAggregatorHandler aggregatorHandler;
/**
* Constructor for setting up the worker.
@@ -185,6 +186,8 @@ public class BspServiceWorker<I extends
new SimplePartitionStore<I, V, E, M>(getConfiguration(),
getContext());
}
+
+ aggregatorHandler = new WorkerAggregatorHandler();
}
public WorkerContext getWorkerContext() {
@@ -700,134 +703,6 @@ public class BspServiceWorker<I extends
finishSuperstep(partitionStatsList);
}
- @Override
- public <A extends Writable> void aggregate(String name, A value) {
- AggregatorWrapper<? extends Writable> aggregator = getAggregator(name);
- if (aggregator != null) {
- ((AggregatorWrapper<A>) aggregator).aggregateCurrent(value);
- } else {
- throw new IllegalStateException("aggregate: Tried to aggregate value " +
- "to unregistered aggregator " + name);
- }
- }
-
- /**
- * Marshal the aggregator values of the worker to a byte array that will
- * later be aggregated by master.
- *
- * @param superstep Superstep to marshall on
- * @return Byte array of the aggreagtor values
- */
- private byte[] marshalAggregatorValues(long superstep) {
- if (superstep == INPUT_SUPERSTEP) {
- return new byte[0];
- }
-
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- DataOutputStream output = new DataOutputStream(outputStream);
- for (Entry<String, AggregatorWrapper<Writable>> entry :
- getAggregatorMap().entrySet()) {
- if (entry.getValue().isChanged()) {
- try {
- output.writeUTF(entry.getKey());
- entry.getValue().getCurrentAggregatedValue().write(output);
- } catch (IOException e) {
- throw new IllegalStateException("Failed to marshall aggregator " +
- "with IOException " + entry.getKey(), e);
- }
- }
- }
-
- if (LOG.isInfoEnabled()) {
- LOG.info(
- "marshalAggregatorValues: Finished assembling aggregator values");
- }
- return outputStream.toByteArray();
- }
-
- /**
- * Get values of aggregators aggregated by master in previous superstep.
- *
- * @param superstep Superstep to get the aggregated values from
- */
- private void getAggregatorValues(long superstep) {
- // prepare aggregators for reading and next superstep
- for (AggregatorWrapper<Writable> aggregator :
- getAggregatorMap().values()) {
- aggregator.setPreviousAggregatedValue(aggregator.createInitialValue());
- aggregator.resetCurrentAggregator();
- }
- String mergedAggregatorPath =
- getMergedAggregatorPath(getApplicationAttempt(), superstep - 1);
-
- byte[] aggregatorArray = null;
- try {
- aggregatorArray = getZkExt().getData(mergedAggregatorPath, false, null);
- } catch (KeeperException.NoNodeException e) {
- LOG.info("getAggregatorValues: no aggregators in " +
- mergedAggregatorPath + " on superstep " + superstep);
- return;
- } catch (KeeperException e) {
- throw new IllegalStateException("Failed to get data for " +
- mergedAggregatorPath + " with KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException("Failed to get data for " +
- mergedAggregatorPath + " with InterruptedException", e);
- }
-
- DataInput input =
- new DataInputStream(new ByteArrayInputStream(aggregatorArray));
- int numAggregators = 0;
-
- try {
- numAggregators = input.readInt();
- } catch (IOException e) {
- throw new IllegalStateException("getAggregatorValues: " +
- "Failed to decode data", e);
- }
-
- for (int i = 0; i < numAggregators; i++) {
- try {
- String aggregatorName = input.readUTF();
- String aggregatorClassName = input.readUTF();
- AggregatorWrapper<Writable> aggregatorWrapper =
- getAggregatorMap().get(aggregatorName);
- if (aggregatorWrapper == null) {
- try {
- Class<? extends Aggregator<Writable>> aggregatorClass =
- (Class<? extends Aggregator<Writable>>)
- Class.forName(aggregatorClassName);
- aggregatorWrapper =
- registerAggregator(aggregatorName, aggregatorClass, false);
- } catch (ClassNotFoundException e) {
- throw new IllegalStateException("Failed to create aggregator " +
- aggregatorName + " of class " + aggregatorClassName +
- " with ClassNotFoundException", e);
- } catch (InstantiationException e) {
- throw new IllegalStateException("Failed to create aggregator " +
- aggregatorName + " of class " + aggregatorClassName +
- " with InstantiationException", e);
- } catch (IllegalAccessException e) {
- throw new IllegalStateException("Failed to create aggregator " +
- aggregatorName + " of class " + aggregatorClassName +
- " with IllegalAccessException", e);
- }
- }
- Writable aggregatorValue = aggregatorWrapper.createInitialValue();
- aggregatorValue.readFields(input);
- aggregatorWrapper.setPreviousAggregatedValue(aggregatorValue);
- } catch (IOException e) {
- throw new IllegalStateException(
- "Failed to decode data for index " + i, e);
- }
- }
-
- if (LOG.isInfoEnabled()) {
- LOG.info("getAggregatorValues: Finished loading " +
- mergedAggregatorPath);
- }
- }
-
/**
* Register the health of this worker for a given superstep
*
@@ -967,7 +842,7 @@ public class BspServiceWorker<I extends
}
if (getSuperstep() != INPUT_SUPERSTEP) {
- getAggregatorValues(getSuperstep());
+ aggregatorHandler.prepareSuperstep(getSuperstep(), this);
}
getContext().setStatus("startSuperstep: " +
getGraphMapper().getMapFunctions().toString() +
@@ -1017,7 +892,7 @@ public class BspServiceWorker<I extends
}
byte[] aggregatorArray =
- marshalAggregatorValues(getSuperstep());
+ aggregatorHandler.finishSuperstep(getSuperstep());
Collection<PartitionStats> finalizedPartitionStats =
workerGraphPartitioner.finalizePartitionStats(
partitionStatsList, getPartitionStore());
@@ -1618,4 +1493,9 @@ public class BspServiceWorker<I extends
public ServerData<I, V, E, M> getServerData() {
return commService.getServerData();
}
+
+ @Override
+ public WorkerAggregatorUsage getAggregatorUsage() {
+ return aggregatorHandler;
+ }
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1393264&r1=1393263&r2=1393264&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Wed Oct 3 02:57:50 2012
@@ -119,7 +119,7 @@ public class GraphMapper<I extends Writa
* @return Worker aggregator usage interface
*/
public final WorkerAggregatorUsage getWorkerAggregatorUsage() {
- return serviceWorker;
+ return serviceWorker.getAggregatorUsage();
}
/**
@@ -128,7 +128,7 @@ public class GraphMapper<I extends Writa
* @return Master aggregator usage interface
*/
public final MasterAggregatorUsage getMasterAggregatorUsage() {
- return serviceMaster;
+ return serviceMaster.getAggregatorUsage();
}
public final WorkerContext getWorkerContext() {
Added: giraph/trunk/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java?rev=1393264&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java Wed Oct 3 02:57:50 2012
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.bsp.SuperstepState;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+
+import net.iharder.Base64;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.List;
+import java.util.Map;
+
+/** Master implementation of {@link AggregatorHandler} */
+public class MasterAggregatorHandler extends AggregatorHandler implements
+ MasterAggregatorUsage, Writable {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(MasterAggregatorHandler.class);
+ /** Aggregator writer */
+ private final AggregatorWriter aggregatorWriter;
+
+ /**
+ * @param config Hadoop configuration
+ */
+ public MasterAggregatorHandler(Configuration config) {
+ aggregatorWriter = BspUtils.createAggregatorWriter(config);
+ }
+
+ @Override
+ public <A extends Writable> void setAggregatedValue(String name, A value) {
+ AggregatorWrapper<? extends Writable> aggregator = getAggregator(name);
+ if (aggregator == null) {
+ throw new IllegalStateException(
+ "setAggregatedValue: Tried to set value of aggregator which wasn't" +
+ " registered " + name);
+ }
+ ((AggregatorWrapper<A>) aggregator).setCurrentAggregatedValue(value);
+ }
+
+ @Override
+ public <A extends Writable> boolean registerAggregator(String name,
+ Class<? extends Aggregator<A>> aggregatorClass) throws
+ InstantiationException, IllegalAccessException {
+ return registerAggregator(name, aggregatorClass, false) != null;
+ }
+
+ @Override
+ public <A extends Writable> boolean registerPersistentAggregator(String name,
+ Class<? extends Aggregator<A>> aggregatorClass) throws
+ InstantiationException, IllegalAccessException {
+ return registerAggregator(name, aggregatorClass, true) != null;
+ }
+
+ /**
+ * Get aggregator values supplied by workers for a particular superstep and
+ * aggregate them
+ *
+ * @param superstep Superstep which we are preparing for
+ * @param service BspService to get zookeeper info from
+ */
+ public void prepareSuperstep(long superstep, BspService service) {
+ String workerFinishedPath =
+ service.getWorkerFinishedPath(
+ service.getApplicationAttempt(), superstep);
+ List<String> hostnameIdPathList = null;
+ try {
+ hostnameIdPathList =
+ service.getZkExt().getChildrenExt(
+ workerFinishedPath, false, false, true);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "collectAndProcessAggregatorValues: KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "collectAndProcessAggregatorValues: InterruptedException", e);
+ }
+
+ for (String hostnameIdPath : hostnameIdPathList) {
+ JSONObject workerFinishedInfoObj = null;
+ byte[] aggregatorArray = null;
+ try {
+ byte[] zkData =
+ service.getZkExt().getData(hostnameIdPath, false, null);
+ workerFinishedInfoObj = new JSONObject(new String(zkData));
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "collectAndProcessAggregatorValues: KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "collectAndProcessAggregatorValues: InterruptedException",
+ e);
+ } catch (JSONException e) {
+ throw new IllegalStateException(
+ "collectAndProcessAggregatorValues: JSONException", e);
+ }
+ try {
+ aggregatorArray = Base64.decode(workerFinishedInfoObj.getString(
+ service.JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY));
+ } catch (JSONException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("collectAndProcessAggregatorValues: " +
+ "No aggregators" + " for " + hostnameIdPath);
+ }
+ continue;
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "collectAndProcessAggregatorValues: IOException", e);
+ }
+
+ DataInputStream input =
+ new DataInputStream(new ByteArrayInputStream(aggregatorArray));
+ try {
+ while (input.available() > 0) {
+ String aggregatorName = input.readUTF();
+ AggregatorWrapper<Writable> aggregator =
+ getAggregatorMap().get(aggregatorName);
+ if (aggregator == null) {
+ throw new IllegalStateException(
+ "collectAndProcessAggregatorValues: " +
+ "Master received aggregator which isn't registered: " +
+ aggregatorName);
+ }
+ Writable aggregatorValue = aggregator.createInitialValue();
+ aggregatorValue.readFields(input);
+ aggregator.aggregateCurrent(aggregatorValue);
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "collectAndProcessAggregatorValues: " +
+ "IOException when reading aggregator data", e);
+ }
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("collectAndProcessAggregatorValues: Processed aggregators");
+ }
+
+ // prepare aggregators for master compute
+ for (AggregatorWrapper<Writable> aggregator :
+ getAggregatorMap().values()) {
+ if (aggregator.isPersistent()) {
+ aggregator.aggregateCurrent(aggregator.getPreviousAggregatedValue());
+ }
+ aggregator.setPreviousAggregatedValue(
+ aggregator.getCurrentAggregatedValue());
+ aggregator.resetCurrentAggregator();
+ }
+ }
+
+ /**
+ * Save the supplied aggregator values.
+ *
+ * @param superstep Superstep which we are finishing.
+ * @param service BspService to get zookeeper info from
+ */
+ public void finishSuperstep(long superstep, BspService service) {
+ Map<String, AggregatorWrapper<Writable>> aggregatorMap =
+ getAggregatorMap();
+
+ for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
+ if (aggregator.isChanged()) {
+ // if master compute changed the value, use the one he chose
+ aggregator.setPreviousAggregatedValue(
+ aggregator.getCurrentAggregatedValue());
+ // reset aggregator for the next superstep
+ aggregator.resetCurrentAggregator();
+ }
+ }
+
+ if (aggregatorMap.size() > 0) {
+ String mergedAggregatorPath =
+ service.getMergedAggregatorPath(
+ service.getApplicationAttempt(),
+ superstep);
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ DataOutput output = new DataOutputStream(outputStream);
+ try {
+ output.writeInt(aggregatorMap.size());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
+ aggregatorMap.entrySet()) {
+ try {
+ output.writeUTF(entry.getKey());
+ output.writeUTF(entry.getValue().getAggregatorClass().getName());
+ entry.getValue().getPreviousAggregatedValue().write(output);
+ } catch (IOException e) {
+ throw new IllegalStateException("saveAggregatorValues: " +
+ "IllegalStateException", e);
+ }
+ }
+
+ try {
+ service.getZkExt().createExt(mergedAggregatorPath,
+ outputStream.toByteArray(),
+ ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true);
+ } catch (KeeperException.NodeExistsException e) {
+ LOG.warn("saveAggregatorValues: " +
+ mergedAggregatorPath + " already exists!");
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "saveAggregatorValues: KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "saveAggregatorValues: IllegalStateException",
+ e);
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("saveAggregatorValues: Finished loading " +
+ mergedAggregatorPath);
+ }
+ }
+ }
+
+ /**
+ * Write aggregators to {@link AggregatorWriter}
+ *
+ * @param superstep Superstep which just finished
+ * @param superstepState State of the superstep which just finished
+ */
+ public void writeAggregators(long superstep,
+ SuperstepState superstepState) {
+ try {
+ Iterable<Map.Entry<String, Writable>> iter =
+ Iterables.transform(
+ getAggregatorMap().entrySet(),
+ new Function<Map.Entry<String, AggregatorWrapper<Writable>>,
+ Map.Entry<String, Writable>>() {
+ @Override
+ public Map.Entry<String, Writable> apply(
+ Map.Entry<String, AggregatorWrapper<Writable>> entry) {
+ return new AbstractMap.SimpleEntry<String,
+ Writable>(entry.getKey(),
+ entry.getValue().getPreviousAggregatedValue());
+ }
+ });
+ aggregatorWriter.writeAggregator(iter,
+ (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE) ?
+ AggregatorWriter.LAST_SUPERSTEP : superstep);
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "coordinateSuperstep: IOException while " +
+ "writing aggregators data", e);
+ }
+ }
+
+ /**
+ * Initialize {@link AggregatorWriter}
+ *
+ * @param service BspService
+ */
+ public void initialize(BspService service) {
+ try {
+ aggregatorWriter.initialize(service.getContext(),
+ service.getApplicationAttempt());
+ } catch (IOException e) {
+ throw new IllegalStateException("MasterAggregatorHandler: " +
+ "Couldn't initialize aggregatorWriter", e);
+ }
+ }
+
+ /**
+ * Close {@link AggregatorWriter}
+ *
+ * @throws IOException
+ */
+ public void close() throws IOException {
+ aggregatorWriter.close();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Map<String, AggregatorWrapper<Writable>> aggregatorMap =
+ getAggregatorMap();
+ out.writeInt(aggregatorMap.size());
+ for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
+ aggregatorMap.entrySet()) {
+ out.writeUTF(entry.getKey());
+ out.writeUTF(entry.getValue().getAggregatorClass().getName());
+ out.writeBoolean(entry.getValue().isPersistent());
+ entry.getValue().getPreviousAggregatedValue().write(out);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ Map<String, AggregatorWrapper<Writable>> aggregatorMap =
+ getAggregatorMap();
+ aggregatorMap.clear();
+ int numAggregators = in.readInt();
+ for (int i = 0; i < numAggregators; i++) {
+ String aggregatorName = in.readUTF();
+ String aggregatorClassName = in.readUTF();
+ boolean isPersistent = in.readBoolean();
+ AggregatorWrapper<Writable> aggregator =
+ registerAggregator(aggregatorName, aggregatorClassName,
+ isPersistent);
+ Writable value = aggregator.createInitialValue();
+ value.readFields(in);
+ aggregator.setPreviousAggregatedValue(value);
+ }
+ }
+}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java?rev=1393264&r1=1393263&r2=1393264&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java Wed Oct 3 02:57:50 2012
@@ -295,7 +295,7 @@ public abstract class Vertex<I extends W
*
* @param graphState Graph state for all workers
*/
- void setGraphState(GraphState<I, V, E, M> graphState) {
+ public void setGraphState(GraphState<I, V, E, M> graphState) {
this.graphState = graphState;
}
Added: giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java?rev=1393264&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java Wed Oct 3 02:57:50 2012
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Worker implementation of {@link AggregatorHandler}
+ */
+public class WorkerAggregatorHandler extends AggregatorHandler implements
+ WorkerAggregatorUsage {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(WorkerAggregatorHandler.class);
+
+ @Override
+ public <A extends Writable> void aggregate(String name, A value) {
+ AggregatorWrapper<? extends Writable> aggregator = getAggregator(name);
+ if (aggregator != null) {
+ ((AggregatorWrapper<A>) aggregator).aggregateCurrent(value);
+ } else {
+ throw new IllegalStateException("aggregate: Tried to aggregate value " +
+ "to unregistered aggregator " + name);
+ }
+ }
+
+ /**
+ * Get aggregator values aggregated by master in previous superstep
+ *
+ * @param superstep Superstep which we are preparing for
+ * @param service BspService to get zookeeper info from
+ */
+ public void prepareSuperstep(long superstep, BspService service) {
+ // prepare aggregators for reading and next superstep
+ for (AggregatorWrapper<Writable> aggregator :
+ getAggregatorMap().values()) {
+ aggregator.setPreviousAggregatedValue(aggregator.createInitialValue());
+ aggregator.resetCurrentAggregator();
+ }
+ String mergedAggregatorPath =
+ service.getMergedAggregatorPath(service.getApplicationAttempt(),
+ superstep - 1);
+
+ byte[] aggregatorArray;
+ try {
+ aggregatorArray =
+ service.getZkExt().getData(mergedAggregatorPath, false, null);
+ } catch (KeeperException.NoNodeException e) {
+ LOG.info("getAggregatorValues: no aggregators in " +
+ mergedAggregatorPath + " on superstep " + superstep);
+ return;
+ } catch (KeeperException e) {
+ throw new IllegalStateException("Failed to get data for " +
+ mergedAggregatorPath + " with KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Failed to get data for " +
+ mergedAggregatorPath + " with InterruptedException", e);
+ }
+
+ DataInput input =
+ new DataInputStream(new ByteArrayInputStream(aggregatorArray));
+ int numAggregators = 0;
+
+ try {
+ numAggregators = input.readInt();
+ } catch (IOException e) {
+ throw new IllegalStateException("getAggregatorValues: " +
+ "Failed to decode data", e);
+ }
+
+ for (int i = 0; i < numAggregators; i++) {
+ try {
+ String aggregatorName = input.readUTF();
+ String aggregatorClassName = input.readUTF();
+ AggregatorWrapper<Writable> aggregator =
+ registerAggregator(aggregatorName, aggregatorClassName, false);
+ Writable aggregatorValue = aggregator.createInitialValue();
+ aggregatorValue.readFields(input);
+ aggregator.setPreviousAggregatedValue(aggregatorValue);
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "Failed to decode data for index " + i, e);
+ }
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("getAggregatorValues: Finished loading " +
+ mergedAggregatorPath);
+ }
+ }
+
+ /**
+ * Put aggregator values of the worker to a byte array that will later be
+ * aggregated by master.
+ *
+ * @param superstep Superstep which we are finishing.
+ * @return Byte array of the aggreagtor values
+ */
+ public byte[] finishSuperstep(long superstep) {
+ if (superstep == BspService.INPUT_SUPERSTEP) {
+ return new byte[0];
+ }
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ DataOutputStream output = new DataOutputStream(outputStream);
+ for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
+ getAggregatorMap().entrySet()) {
+ if (entry.getValue().isChanged()) {
+ try {
+ output.writeUTF(entry.getKey());
+ entry.getValue().getCurrentAggregatedValue().write(output);
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to marshall aggregator " +
+ "with IOException " + entry.getKey(), e);
+ }
+ }
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info(
+ "marshalAggregatorValues: Finished assembling aggregator values");
+ }
+ return outputStream.toByteArray();
+ }
+}
Added: giraph/trunk/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java?rev=1393264&view=auto
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java (added)
+++ giraph/trunk/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java Wed Oct 3 02:57:50 2012
@@ -0,0 +1,162 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.BspCase;
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.aggregators.DoubleOverwriteAggregator;
+import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.examples.AggregatorsTestVertex;
+import org.apache.giraph.examples.SimpleCheckpointVertex;
+import org.apache.giraph.examples.SimplePageRankVertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/** Tests if aggregators are handled on a proper way */
+public class TestAggregatorsHandling extends BspCase {
+
+ public TestAggregatorsHandling() {
+ super(TestAggregatorsHandling.class.getName());
+ }
+
+ /** Tests if aggregators are handled on a proper way during supersteps */
+ @Test
+ public void testAggregatorsHandling() throws IOException,
+ ClassNotFoundException, InterruptedException {
+ GiraphJob job = prepareJob(getCallingMethodName(),
+ AggregatorsTestVertex.class,
+ SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
+ job.getConfiguration().setMasterComputeClass(
+ AggregatorsTestVertex.AggregatorsTestMasterCompute.class);
+ assertTrue(job.run(true));
+ }
+
+ /** Test if aggregators serialization captures everything */
+ @Test
+ public void testMasterAggregatorsSerialization() throws
+ IllegalAccessException, InstantiationException, IOException {
+ MasterAggregatorHandler handler =
+ new MasterAggregatorHandler(new Configuration());
+
+ String regularAggName = "regular";
+ LongWritable regularValue = new LongWritable(5);
+ handler.registerAggregator(regularAggName, LongSumAggregator.class);
+ handler.setAggregatedValue(regularAggName, regularValue);
+
+ String persistentAggName = "persistent";
+ DoubleWritable persistentValue = new DoubleWritable(10.5);
+ handler.registerPersistentAggregator(persistentAggName,
+ DoubleOverwriteAggregator.class);
+ handler.setAggregatedValue(persistentAggName, persistentValue);
+
+ for (AggregatorWrapper<Writable> aggregator :
+ handler.getAggregatorMap().values()) {
+ aggregator.setPreviousAggregatedValue(
+ aggregator.getCurrentAggregatedValue());
+ }
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ handler.write(new DataOutputStream(out));
+
+ MasterAggregatorHandler restartedHandler =
+ new MasterAggregatorHandler(new Configuration());
+ restartedHandler.readFields(
+ new DataInputStream(new ByteArrayInputStream(out.toByteArray())));
+
+ assertEquals(2, restartedHandler.getAggregatorMap().size());
+
+ AggregatorWrapper<Writable> regularAgg =
+ restartedHandler.getAggregatorMap().get(regularAggName);
+ assertTrue(
+ regularAgg.getAggregatorClass().equals(LongSumAggregator.class));
+ assertEquals(regularValue, regularAgg.getPreviousAggregatedValue());
+ assertEquals(regularValue,
+ restartedHandler.<LongWritable>getAggregatedValue(regularAggName));
+ assertFalse(regularAgg.isPersistent());
+
+ AggregatorWrapper<Writable> persistentAgg =
+ restartedHandler.getAggregatorMap().get(persistentAggName);
+ assertTrue(persistentAgg.getAggregatorClass().equals
+ (DoubleOverwriteAggregator.class));
+ assertEquals(persistentValue, persistentAgg.getPreviousAggregatedValue());
+ assertEquals(persistentValue,
+ restartedHandler.<LongWritable>getAggregatedValue(persistentAggName));
+ assertTrue(persistentAgg.isPersistent());
+ }
+
+ /**
+ * Test if aggregators are are handled properly when restarting from a
+ * checkpoint
+ */
+ @Test
+ public void testAggregatorsCheckpointing() throws ClassNotFoundException,
+ IOException, InterruptedException {
+ Path checkpointsDir = getTempPath("checkPointsForTesting");
+ Path outputPath = getTempPath(getCallingMethodName());
+ GiraphJob job = prepareJob(getCallingMethodName(),
+ AggregatorsTestVertex.class,
+ null,
+ AggregatorsTestVertex.AggregatorsTestMasterCompute.class,
+ SimplePageRankVertex.SimplePageRankVertexInputFormat.class,
+ null,
+ outputPath);
+
+ job.getConfiguration().set(GiraphConfiguration.CHECKPOINT_DIRECTORY,
+ checkpointsDir.toString());
+ job.getConfiguration().setBoolean(
+ GiraphConfiguration.CLEANUP_CHECKPOINTS_AFTER_SUCCESS, false);
+ job.getConfiguration().setInt(GiraphConfiguration.CHECKPOINT_FREQUENCY, 4);
+
+ assertTrue(job.run(true));
+
+ // Restart the test from superstep 4
+ System.out.println("testAggregatorsCheckpointing: Restarting from " +
+ "superstep 4 with checkpoint path = " + checkpointsDir);
+ outputPath = getTempPath(getCallingMethodName() + "Restarted");
+ GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
+ AggregatorsTestVertex.class,
+ null,
+ AggregatorsTestVertex.AggregatorsTestMasterCompute.class,
+ SimplePageRankVertex.SimplePageRankVertexInputFormat.class,
+ null,
+ outputPath);
+ job.getConfiguration().setMasterComputeClass(
+ SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+ restartedJob.getConfiguration().set(
+ GiraphConfiguration.CHECKPOINT_DIRECTORY, checkpointsDir.toString());
+ restartedJob.getConfiguration().setLong(
+ GiraphConfiguration.RESTART_SUPERSTEP, 4);
+
+ assertTrue(restartedJob.run(true));
+ }
+}