You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2012/08/09 11:10:59 UTC
svn commit: r1371108 [2/2] - in /giraph/trunk: ./
src/main/java/org/apache/giraph/aggregators/
src/main/java/org/apache/giraph/benchmark/
src/main/java/org/apache/giraph/bsp/
src/main/java/org/apache/giraph/examples/
src/main/java/org/apache/giraph/gra...
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/Aggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/Aggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/Aggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/Aggregator.java Thu Aug 9 09:10:57 2012
@@ -36,12 +36,12 @@ public interface Aggregator<A extends Wr
void aggregate(A value);
/**
- * Set aggregated value.
- * Can be used for initialization or reset.
+ * Return new aggregated value which is neutral to aggregate operation.
+ * Must be changeable without affecting internals of Aggregator
*
- * @param value Value to be set.
+ * @return Neutral value
*/
- void setAggregatedValue(A value);
+ A createInitialValue();
/**
* Return current aggregated value.
@@ -53,10 +53,15 @@ public interface Aggregator<A extends Wr
A getAggregatedValue();
/**
- * Return new aggregated value.
- * Must be changeable without affecting internals of Aggregator
+ * Set aggregated value.
+ * Can be used for initialization or reset.
*
- * @return Writable
+ * @param value Value to be set.
+ */
+ void setAggregatedValue(A value);
+
+ /**
+ * Reset the value of aggregator to neutral value
*/
- A createAggregatedValue();
+ void reset();
}
Added: giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWrapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWrapper.java?rev=1371108&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWrapper.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWrapper.java Thu Aug 9 09:10:57 2012
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Wrapper for aggregators. Keeps two instances of an aggregator - one for
+ * the value from previous super step, and one for the value which is being
+ * generated in current super step.
+ *
+ * @param <A> Aggregated value
+ */
+public class AggregatorWrapper<A extends Writable> {
+ /** False iff aggregator should be reset at the end of each super step */
+ private final boolean persistent;
+ /** Value aggregated in previous super step */
+ private A previousAggregatedValue;
+ /** Aggregator for next super step */
+ private final Aggregator<A> currentAggregator;
+ /** Whether anyone changed current value since the moment it was reset */
+ private boolean changed;
+
+ /**
+ * @param aggregatorClass Class type of the aggregator
+ * @param persistent False iff aggregator should be reset at the end of
+ * each super step
+ * @throws IllegalAccessException
+ * @throws InstantiationException
+ */
+ public AggregatorWrapper(Class<? extends Aggregator<A>> aggregatorClass,
+ boolean persistent) throws IllegalAccessException,
+ InstantiationException {
+ this.persistent = persistent;
+ currentAggregator = aggregatorClass.newInstance();
+ changed = false;
+ previousAggregatedValue = currentAggregator.createInitialValue();
+ }
+
+ /**
+ * Get aggregated value from previous super step
+ *
+ * @return Aggregated value from previous super step
+ */
+ public A getPreviousAggregatedValue() {
+ return previousAggregatedValue;
+ }
+
+ /**
+ * Set aggregated value for previous super step
+ *
+ * @param value Aggregated value to set
+ */
+ public void setPreviousAggregatedValue(A value) {
+ previousAggregatedValue = value;
+ }
+
+ /**
+ * Check if aggregator is persistent
+ *
+ * @return False iff aggregator should be reset at the end of each super step
+ */
+ public boolean isPersistent() {
+ return persistent;
+ }
+
+ /**
+ * Check if current aggregator was changed
+ *
+ * @return Whether anyone changed current value since the moment it was reset
+ */
+ public boolean isChanged() {
+ return changed;
+ }
+
+ /**
+ * Add a new value to current aggregator
+ *
+ * @param value Value to be aggregated
+ */
+ public void aggregateCurrent(A value) {
+ changed = true;
+ currentAggregator.aggregate(value);
+ }
+
+ /**
+ * Get current aggregated value
+ *
+ * @return Current aggregated value
+ */
+ public A getCurrentAggregatedValue() {
+ return currentAggregator.getAggregatedValue();
+ }
+
+ /**
+ * Set aggregated value of current aggregator
+ *
+ * @param value Value to set it to
+ */
+ public void setCurrentAggregatedValue(A value) {
+ changed = true;
+ currentAggregator.setAggregatedValue(value);
+ }
+
+ /**
+ * Reset the value of current aggregator to neutral value
+ */
+ public void resetCurrentAggregator() {
+ changed = false;
+ currentAggregator.reset();
+ }
+
+ /**
+ * Return new aggregated value which is neutral to aggregate operation
+ *
+ * @return Neutral value
+ */
+ public A createInitialValue() {
+ return currentAggregator.createInitialValue();
+ }
+
+ /**
+ * Get class of wrapped aggregator
+ *
+ * @return Aggregator class
+ */
+ public Class<? extends Aggregator> getAggregatorClass() {
+ return currentAggregator.getClass();
+ }
+}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java Thu Aug 9 09:10:57 2012
@@ -19,7 +19,7 @@
package org.apache.giraph.graph;
import java.io.IOException;
-import java.util.Map;
+import java.util.Map.Entry;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper.Context;
@@ -28,7 +28,7 @@ import org.apache.hadoop.mapreduce.Mappe
* An AggregatorWriter is used to export Aggregators during or at the end of
* each computation. It runs on the master and it's called at the end of each
* superstep. The special signal {@link AggregatorWriter#LAST_SUPERSTEP} is
- * passed to {@link AggregatorWriter#writeAggregator(Map, long)} as the
+ * passed to {@link AggregatorWriter#writeAggregator(Iterable, long)} as the
* superstep value to signal the end of computation.
*/
public interface AggregatorWriter {
@@ -53,19 +53,19 @@ public interface AggregatorWriter {
* whether to write the aggregators values for the current superstep. For
* the last superstep, {@link AggregatorWriter#LAST_SUPERSTEP} is passed.
*
- * @param aggregatorMap Map of aggregators to write
+ * @param aggregatorMap Map from aggregator name to aggregator value
* @param superstep Current superstep
* @throws IOException
*/
void writeAggregator(
- Map<String, Aggregator<Writable>> aggregatorMap,
+ Iterable<Entry<String, Writable>> aggregatorMap,
long superstep) throws IOException;
/**
* The method is called at the end of a successful computation. The method
* is not called when the job fails and a new master is elected. For this
* reason it's advised to flush data at the end of
- * {@link AggregatorWriter#writeAggregator(Map, long)}.
+ * {@link AggregatorWriter#writeAggregator(Iterable, long)}.
*
* @throws IOException
*/
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java Thu Aug 9 09:10:57 2012
@@ -259,8 +259,8 @@ public abstract class BspService<I exten
/** Checkpoint frequency */
private final int checkpointFrequency;
/** Map of aggregators */
- private Map<String, Aggregator<Writable>> aggregatorMap =
- new TreeMap<String, Aggregator<Writable>>();
+ private Map<String, AggregatorWrapper<Writable>> aggregatorMap =
+ new TreeMap<String, AggregatorWrapper<Writable>>();
/**
* Constructor.
@@ -884,22 +884,23 @@ public abstract class BspService<I exten
* @param <A> Aggregator type
* @param name Name of the aggregator
* @param aggregatorClass Class of the aggregator
+ * @param persistent False iff aggregator should be reset at the end of
+ * every super step
* @return Aggregator
* @throws IllegalAccessException
* @throws InstantiationException
*/
- public final <A extends Writable> Aggregator<A> registerAggregator(
- String name,
- Class<? extends Aggregator<A>> aggregatorClass)
- throws InstantiationException, IllegalAccessException {
+ protected <A extends Writable> AggregatorWrapper<A> registerAggregator(
+ String name, Class<? extends Aggregator<A>> aggregatorClass,
+ boolean persistent) throws InstantiationException,
+ IllegalAccessException {
if (aggregatorMap.get(name) != null) {
return null;
}
- Aggregator<A> aggregator =
- (Aggregator<A>) aggregatorClass.newInstance();
- @SuppressWarnings("unchecked")
- Aggregator<Writable> writableAggregator =
- (Aggregator<Writable>) aggregator;
+ AggregatorWrapper<A> aggregator =
+ new AggregatorWrapper<A>(aggregatorClass, persistent);
+ AggregatorWrapper<Writable> writableAggregator =
+ (AggregatorWrapper<Writable>) aggregator;
aggregatorMap.put(name, writableAggregator);
if (LOG.isInfoEnabled()) {
LOG.info("registerAggregator: registered " + name);
@@ -913,16 +914,32 @@ public abstract class BspService<I exten
* @param name Name of aggregator
* @return Aggregator or null when not registered
*/
- public final Aggregator<? extends Writable> getAggregator(String name) {
+ protected AggregatorWrapper<? extends Writable> getAggregator(String name) {
return aggregatorMap.get(name);
}
/**
+ * Get value of an aggregator.
+ *
+ * @param name Name of aggregator
+ * @param <A> Aggregated value
+ * @return Value of the aggregator
+ */
+ public <A extends Writable> A getAggregatedValue(String name) {
+ AggregatorWrapper<? extends Writable> aggregator = getAggregator(name);
+ if (aggregator == null) {
+ return null;
+ } else {
+ return (A) aggregator.getPreviousAggregatedValue();
+ }
+ }
+
+ /**
* Get the aggregator map.
*
* @return Map of aggregator names to aggregator
*/
- public Map<String, Aggregator<Writable>> getAggregatorMap() {
+ protected Map<String, AggregatorWrapper<Writable>> getAggregatorMap() {
return aggregatorMap;
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Thu Aug 9 09:10:57 2012
@@ -48,7 +48,6 @@ import org.apache.zookeeper.KeeperExcept
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
-import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
@@ -60,7 +59,7 @@ import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.InputStream;
+import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -68,11 +67,15 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+
/**
- * ZooKeeper-based implementation of {@link CentralizedService}.
+ * ZooKeeper-based implementation of {@link CentralizedServiceMaster}.
*
* @param <I> Vertex id
* @param <V> Vertex data
@@ -869,7 +872,7 @@ public class BspServiceMaster<I extends
for (String hostnameIdPath : hostnameIdPathList) {
JSONObject workerFinishedInfoObj = null;
- JSONArray aggregatorArray = null;
+ byte[] aggregatorArray = null;
try {
byte [] zkData =
getZkExt().getData(hostnameIdPath, false, null);
@@ -886,89 +889,57 @@ public class BspServiceMaster<I extends
"collectAndProcessAggregatorValues: JSONException", e);
}
try {
- aggregatorArray = workerFinishedInfoObj.getJSONArray(
- JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY);
+ aggregatorArray = Base64.decode(workerFinishedInfoObj.getString(
+ JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY));
} catch (JSONException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("collectAndProcessAggregatorValues: " +
"No aggregators" + " for " + hostnameIdPath);
}
continue;
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "collectAndProcessAggregatorValues: IOException", e);
}
- for (int i = 0; i < aggregatorArray.length(); ++i) {
- try {
- if (LOG.isInfoEnabled()) {
- LOG.info("collectAndProcessAggregatorValues: " +
- "Getting aggregators from " +
- aggregatorArray.getJSONObject(i));
- }
- String aggregatorName =
- aggregatorArray.getJSONObject(i).getString(
- AGGREGATOR_NAME_KEY);
- String aggregatorClassName =
- aggregatorArray.getJSONObject(i).getString(
- AGGREGATOR_CLASS_NAME_KEY);
- @SuppressWarnings("unchecked")
- Aggregator<Writable> aggregator =
- (Aggregator<Writable>) getAggregator(aggregatorName);
- boolean firstTime = false;
+
+ DataInputStream input =
+ new DataInputStream(new ByteArrayInputStream(aggregatorArray));
+ try {
+ while (input.available() > 0) {
+ String aggregatorName = input.readUTF();
+ AggregatorWrapper<Writable> aggregator =
+ getAggregatorMap().get(aggregatorName);
if (aggregator == null) {
- @SuppressWarnings("unchecked")
- Class<? extends Aggregator<Writable>> aggregatorClass =
- (Class<? extends Aggregator<Writable>>)
- Class.forName(aggregatorClassName);
- aggregator = registerAggregator(
- aggregatorName,
- aggregatorClass);
- firstTime = true;
- }
- Writable aggregatorValue =
- aggregator.createAggregatedValue();
- InputStream input =
- new ByteArrayInputStream(
- Base64.decode(
- aggregatorArray.getJSONObject(i).
- getString(AGGREGATOR_VALUE_KEY)));
- aggregatorValue.readFields(new DataInputStream(input));
- if (LOG.isDebugEnabled()) {
- LOG.debug("collectAndProcessAggregatorValues: " +
- "aggregator value size=" + input.available() +
- " for aggregator=" + aggregatorName +
- " value=" + aggregatorValue);
- }
- if (firstTime) {
- aggregator.setAggregatedValue(aggregatorValue);
- } else {
- aggregator.aggregate(aggregatorValue);
+ throw new IllegalStateException(
+ "collectAndProcessAggregatorValues: " +
+ "Master received aggregator which isn't registered: " +
+ aggregatorName);
}
- } catch (IOException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: " +
- "IOException when reading aggregator data " +
- aggregatorArray, e);
- } catch (JSONException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: " +
- "JSONException when reading aggregator data " +
- aggregatorArray, e);
- } catch (ClassNotFoundException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: " +
- "ClassNotFoundException when reading aggregator data " +
- aggregatorArray, e);
- } catch (InstantiationException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: " +
- "InstantiationException when reading aggregator data " +
- aggregatorArray, e);
- } catch (IllegalAccessException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: " +
- "IOException when reading aggregator data " +
- aggregatorArray, e);
+ Writable aggregatorValue = aggregator.createInitialValue();
+ aggregatorValue.readFields(input);
+ aggregator.aggregateCurrent(aggregatorValue);
}
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "collectAndProcessAggregatorValues: " +
+ "IOException when reading aggregator data", e);
}
}
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("collectAndProcessAggregatorValues: Processed aggregators");
+ }
+
+ // prepare aggregators for master compute
+ for (AggregatorWrapper<Writable> aggregator :
+ getAggregatorMap().values()) {
+ if (aggregator.isPersistent()) {
+ aggregator.aggregateCurrent(aggregator.getPreviousAggregatedValue());
+ }
+ aggregator.setPreviousAggregatedValue(
+ aggregator.getCurrentAggregatedValue());
+ aggregator.resetCurrentAggregator();
+ }
}
/**
@@ -977,48 +948,45 @@ public class BspServiceMaster<I extends
* @param superstep superstep for which to save values
*/
private void saveAggregatorValues(long superstep) {
- Map<String, Aggregator<Writable>> aggregatorMap = getAggregatorMap();
+ Map<String, AggregatorWrapper<Writable>> aggregatorMap =
+ getAggregatorMap();
+
+ for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
+ if (aggregator.isChanged()) {
+ // if master compute changed the value, use the one he chose
+ aggregator.setPreviousAggregatedValue(
+ aggregator.getCurrentAggregatedValue());
+ // reset aggregator for the next superstep
+ aggregator.resetCurrentAggregator();
+ }
+ }
+
if (aggregatorMap.size() > 0) {
String mergedAggregatorPath =
getMergedAggregatorPath(getApplicationAttempt(), superstep);
- byte [] zkData = null;
- JSONArray aggregatorArray = new JSONArray();
- for (Map.Entry<String, Aggregator<Writable>> entry :
- aggregatorMap.entrySet()) {
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ DataOutput output = new DataOutputStream(outputStream);
+ try {
+ output.writeInt(aggregatorMap.size());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
+ aggregatorMap.entrySet()) {
try {
- ByteArrayOutputStream outputStream =
- new ByteArrayOutputStream();
- DataOutput output = new DataOutputStream(outputStream);
- entry.getValue().getAggregatedValue().write(output);
-
- JSONObject aggregatorObj = new JSONObject();
- aggregatorObj.put(AGGREGATOR_NAME_KEY,
- entry.getKey());
- aggregatorObj.put(
- AGGREGATOR_VALUE_KEY,
- Base64.encodeBytes(outputStream.toByteArray()));
- aggregatorArray.put(aggregatorObj);
- if (LOG.isInfoEnabled()) {
- LOG.info("saveAggregatorValues: " +
- "Trying to add aggregatorObj " +
- aggregatorObj + "(" +
- entry.getValue().getAggregatedValue() +
- ") to merged aggregator path " +
- mergedAggregatorPath);
- }
+ output.writeUTF(entry.getKey());
+ output.writeUTF(entry.getValue().getAggregatorClass().getName());
+ entry.getValue().getPreviousAggregatedValue().write(output);
} catch (IOException e) {
- throw new IllegalStateException(
- "saveAggregatorValues: " +
- "IllegalStateException", e);
- } catch (JSONException e) {
- throw new IllegalStateException(
- "saveAggregatorValues: JSONException", e);
+ throw new IllegalStateException("saveAggregatorValues: " +
+ "IllegalStateException", e);
}
}
+
try {
- zkData = aggregatorArray.toString().getBytes();
getZkExt().createExt(mergedAggregatorPath,
- zkData,
+ outputStream.toByteArray(),
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT,
true);
@@ -1034,10 +1002,8 @@ public class BspServiceMaster<I extends
e);
}
if (LOG.isInfoEnabled()) {
- LOG.info("saveAggregatorValues: Finished " +
- "loading " +
- mergedAggregatorPath + " with aggregator values " +
- aggregatorArray);
+ LOG.info("saveAggregatorValues: Finished loading " +
+ mergedAggregatorPath);
}
}
}
@@ -1534,7 +1500,20 @@ public class BspServiceMaster<I extends
superstepState = SuperstepState.THIS_SUPERSTEP_DONE;
}
try {
- aggregatorWriter.writeAggregator(getAggregatorMap(),
+ Iterable<Map.Entry<String, Writable>> iter =
+ Iterables.transform(
+ getAggregatorMap().entrySet(),
+ new Function<Entry<String, AggregatorWrapper<Writable>>,
+ Entry<String, Writable>>() {
+ @Override
+ public Entry<String, Writable> apply(
+ Entry<String, AggregatorWrapper<Writable>> entry) {
+ return new AbstractMap.SimpleEntry<String,
+ Writable>(entry.getKey(),
+ entry.getValue().getPreviousAggregatedValue());
+ }
+ });
+ aggregatorWriter.writeAggregator(iter,
(superstepState == SuperstepState.ALL_SUPERSTEPS_DONE) ?
AggregatorWriter.LAST_SUPERSTEP : getSuperstep());
} catch (IOException e) {
@@ -1787,14 +1766,28 @@ public class BspServiceMaster<I extends
return foundEvent;
}
- /**
- * Use an aggregator in this superstep. Note that the master uses all
- * aggregators by default, so calling this function is not neccessary.
- *
- * @param name Name of aggregator (should be unique)
- * @return boolean (always true)
- */
- public boolean useAggregator(String name) {
- return true;
+ @Override
+ public <A extends Writable> boolean registerAggregator(String name,
+ Class<? extends Aggregator<A>> aggregatorClass) throws
+ InstantiationException, IllegalAccessException {
+ return registerAggregator(name, aggregatorClass, false) != null;
+ }
+
+ @Override
+ public <A extends Writable> boolean registerPersistentAggregator(String name,
+ Class<? extends Aggregator<A>> aggregatorClass) throws
+ InstantiationException, IllegalAccessException {
+ return registerAggregator(name, aggregatorClass, true) != null;
+ }
+
+ @Override
+ public <A extends Writable> void setAggregatedValue(String name, A value) {
+ AggregatorWrapper<? extends Writable> aggregator = getAggregator(name);
+ if (aggregator == null) {
+ throw new IllegalStateException(
+ "setAggregatedValue: Tried to set value of aggregator which wasn't" +
+ " registered " + name);
+ }
+ ((AggregatorWrapper<A>) aggregator).setCurrentAggregatedValue(value);
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Thu Aug 9 09:10:57 2012
@@ -57,11 +57,11 @@ import net.iharder.Base64;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -71,7 +71,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.TreeSet;
/**
* ZooKeeper-based implementation of {@link CentralizedServiceWorker}.
@@ -92,8 +91,6 @@ public class BspServiceWorker<I extends
private int inputSplitCount = -1;
/** My process health znode */
private String myHealthZnode;
- /** List of aggregators currently in use */
- private Set<String> aggregatorInUse = new TreeSet<String>();
/** Worker info */
private final WorkerInfo workerInfo;
/** Worker graph partitioner */
@@ -130,7 +127,6 @@ public class BspServiceWorker<I extends
* @param context Mapper context
* @param graphMapper Graph mapper
* @param graphState Global graph state
- * @throws UnknownHostException
* @throws IOException
* @throws InterruptedException
*/
@@ -194,21 +190,6 @@ public class BspServiceWorker<I extends
}
/**
- * Use an aggregator in this superstep.
- *
- * @param name Name of aggregator (should be unique)
- * @return boolean (false when aggregator not registered)
- */
- public boolean useAggregator(String name) {
- if (getAggregatorMap().get(name) == null) {
- LOG.error("userAggregator: Aggregator=" + name + " not registered");
- return false;
- }
- aggregatorInUse.add(name);
- return true;
- }
-
- /**
* Try to reserve an InputSplit for loading. While InputSplits exists that
* are not finished, wait until they are.
*
@@ -692,57 +673,49 @@ public class BspServiceWorker<I extends
finishSuperstep(partitionStatsList);
}
+ @Override
+ public <A extends Writable> void aggregate(String name, A value) {
+ AggregatorWrapper<? extends Writable> aggregator = getAggregator(name);
+ if (aggregator != null) {
+ ((AggregatorWrapper<A>) aggregator).aggregateCurrent(value);
+ } else {
+ throw new IllegalStateException("aggregate: Tried to aggregate value " +
+ "to unregistered aggregator " + name);
+ }
+ }
+
/**
- * Marshal the aggregator values of to a JSONArray that will later be
- * aggregated by master. Reset the 'use' of aggregators in the next
- * superstep
+ * Marshal the aggregator values of the worker to a byte array that will
+ * later be aggregated by master.
*
* @param superstep Superstep to marshall on
- * @return JSON array of the aggreagtor values
+ * @return Byte array of the aggreagtor values
*/
- private JSONArray marshalAggregatorValues(long superstep) {
- JSONArray aggregatorArray = new JSONArray();
- if ((superstep == INPUT_SUPERSTEP) || (aggregatorInUse.size() == 0)) {
- return aggregatorArray;
+ private byte[] marshalAggregatorValues(long superstep) {
+ if (superstep == INPUT_SUPERSTEP) {
+ return new byte[0];
}
- for (String name : aggregatorInUse) {
- try {
- Aggregator<Writable> aggregator = getAggregatorMap().get(name);
- ByteArrayOutputStream outputStream =
- new ByteArrayOutputStream();
- DataOutput output = new DataOutputStream(outputStream);
- aggregator.getAggregatedValue().write(output);
-
- JSONObject aggregatorObj = new JSONObject();
- aggregatorObj.put(AGGREGATOR_NAME_KEY, name);
- aggregatorObj.put(AGGREGATOR_CLASS_NAME_KEY,
- aggregator.getClass().getName());
- aggregatorObj.put(
- AGGREGATOR_VALUE_KEY,
- Base64.encodeBytes(outputStream.toByteArray()));
- aggregatorArray.put(aggregatorObj);
- if (LOG.isInfoEnabled()) {
- LOG.info("marshalAggregatorValues: " +
- "Found aggregatorObj " +
- aggregatorObj + ", value (" +
- aggregator.getAggregatedValue() + ")");
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ DataOutputStream output = new DataOutputStream(outputStream);
+ for (Entry<String, AggregatorWrapper<Writable>> entry :
+ getAggregatorMap().entrySet()) {
+ if (entry.getValue().isChanged()) {
+ try {
+ output.writeUTF(entry.getKey());
+ entry.getValue().getCurrentAggregatedValue().write(output);
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to marshall aggregator " +
+ "with IOException " + entry.getKey(), e);
}
- } catch (JSONException e) {
- throw new IllegalStateException("Failed to marshall aggregator " +
- "with JSONException " + name, e);
- } catch (IOException e) {
- throw new IllegalStateException("Failed to marshall aggregator " +
- "with IOException " + name, e);
}
}
if (LOG.isInfoEnabled()) {
- LOG.info("marshalAggregatorValues: Finished assembling " +
- "aggregator values in JSONArray - " + aggregatorArray);
+ LOG.info(
+ "marshalAggregatorValues: Finished assembling aggregator values");
}
- aggregatorInUse.clear();
- return aggregatorArray;
+ return outputStream.toByteArray();
}
/**
@@ -751,13 +724,18 @@ public class BspServiceWorker<I extends
* @param superstep Superstep to get the aggregated values from
*/
private void getAggregatorValues(long superstep) {
+ // prepare aggregators for reading and next superstep
+ for (AggregatorWrapper<Writable> aggregator :
+ getAggregatorMap().values()) {
+ aggregator.setPreviousAggregatedValue(aggregator.createInitialValue());
+ aggregator.resetCurrentAggregator();
+ }
String mergedAggregatorPath =
getMergedAggregatorPath(getApplicationAttempt(), superstep - 1);
- JSONArray aggregatorArray = null;
+
+ byte[] aggregatorArray = null;
try {
- byte[] zkData =
- getZkExt().getData(mergedAggregatorPath, false, null);
- aggregatorArray = new JSONArray(new String(zkData));
+ aggregatorArray = getZkExt().getData(mergedAggregatorPath, false, null);
} catch (KeeperException.NoNodeException e) {
LOG.info("getAggregatorValues: no aggregators in " +
mergedAggregatorPath + " on superstep " + superstep);
@@ -768,49 +746,58 @@ public class BspServiceWorker<I extends
} catch (InterruptedException e) {
throw new IllegalStateException("Failed to get data for " +
mergedAggregatorPath + " with InterruptedException", e);
- } catch (JSONException e) {
- throw new IllegalStateException("Failed to get data for " +
- mergedAggregatorPath + " with JSONException", e);
}
- for (int i = 0; i < aggregatorArray.length(); ++i) {
+
+ DataInput input =
+ new DataInputStream(new ByteArrayInputStream(aggregatorArray));
+ int numAggregators = 0;
+
+ try {
+ numAggregators = input.readInt();
+ } catch (IOException e) {
+ throw new IllegalStateException("getAggregatorValues: " +
+ "Failed to decode data", e);
+ }
+
+ for (int i = 0; i < numAggregators; i++) {
try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("getAggregatorValues: " +
- "Getting aggregators from " +
- aggregatorArray.getJSONObject(i));
- }
- String aggregatorName = aggregatorArray.getJSONObject(i).
- getString(AGGREGATOR_NAME_KEY);
- Aggregator<Writable> aggregator =
+ String aggregatorName = input.readUTF();
+ String aggregatorClassName = input.readUTF();
+ AggregatorWrapper<Writable> aggregatorWrapper =
getAggregatorMap().get(aggregatorName);
- if (aggregator == null) {
- continue;
- }
- Writable aggregatorValue = aggregator.getAggregatedValue();
- InputStream input =
- new ByteArrayInputStream(
- Base64.decode(aggregatorArray.getJSONObject(i).
- getString(AGGREGATOR_VALUE_KEY)));
- aggregatorValue.readFields(
- new DataInputStream(input));
- aggregator.setAggregatedValue(aggregatorValue);
- if (LOG.isDebugEnabled()) {
- LOG.debug("getAggregatorValues: " +
- "Got aggregator=" + aggregatorName + " value=" +
- aggregatorValue);
+ if (aggregatorWrapper == null) {
+ try {
+ Class<? extends Aggregator<Writable>> aggregatorClass =
+ (Class<? extends Aggregator<Writable>>)
+ Class.forName(aggregatorClassName);
+ aggregatorWrapper =
+ registerAggregator(aggregatorName, aggregatorClass, false);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException("Failed to create aggregator " +
+ aggregatorName + " of class " + aggregatorClassName +
+ " with ClassNotFoundException", e);
+ } catch (InstantiationException e) {
+ throw new IllegalStateException("Failed to create aggregator " +
+ aggregatorName + " of class " + aggregatorClassName +
+ " with InstantiationException", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException("Failed to create aggregator " +
+ aggregatorName + " of class " + aggregatorClassName +
+ " with IllegalAccessException", e);
+ }
}
- } catch (JSONException e) {
- throw new IllegalStateException("Failed to decode data for index " +
- i + " with KeeperException", e);
+ Writable aggregatorValue = aggregatorWrapper.createInitialValue();
+ aggregatorValue.readFields(input);
+ aggregatorWrapper.setPreviousAggregatedValue(aggregatorValue);
} catch (IOException e) {
- throw new IllegalStateException("Failed to decode data for index " +
- i + " with KeeperException", e);
+ throw new IllegalStateException(
+ "Failed to decode data for index " + i, e);
}
}
+
if (LOG.isInfoEnabled()) {
LOG.info("getAggregatorValues: Finished loading " +
- mergedAggregatorPath + " with aggregator values " +
- aggregatorArray);
+ mergedAggregatorPath);
}
}
@@ -988,7 +975,7 @@ public class BspServiceWorker<I extends
MemoryUtils.getRuntimeMemoryStats());
}
- JSONArray aggregatorValueArray =
+ byte[] aggregatorArray =
marshalAggregatorValues(getSuperstep());
Collection<PartitionStats> finalizedPartitionStats =
workerGraphPartitioner.finalizePartitionStats(
@@ -1000,7 +987,7 @@ public class BspServiceWorker<I extends
JSONObject workerFinishedInfoObj = new JSONObject();
try {
workerFinishedInfoObj.put(JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY,
- aggregatorValueArray);
+ Base64.encodeBytes(aggregatorArray));
workerFinishedInfoObj.put(JSONOBJ_PARTITION_STATS_KEY,
Base64.encodeBytes(partitionStatsBytes));
workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY,
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Thu Aug 9 09:10:57 2012
@@ -111,19 +111,21 @@ public class GraphMapper<I extends Writa
}
/**
- * Get the aggregator usage, a subset of the functionality
+ * Get worker aggregator usage, a subset of the functionality
*
- * @return Aggregator usage interface
+ * @return Worker aggregator usage interface
*/
- public final AggregatorUsage getAggregatorUsage() {
- AggregatorUsage result = null;
- if (serviceWorker != null) {
- result = serviceWorker;
- }
- if (serviceMaster != null) {
- result = serviceMaster;
- }
- return result;
+ public final WorkerAggregatorUsage getWorkerAggregatorUsage() {
+ return serviceWorker;
+ }
+
+ /**
+ * Get master aggregator usage, a subset of the functionality
+ *
+ * @return Master aggregator usage interface
+ */
+ public final MasterAggregatorUsage getMasterAggregatorUsage() {
+ return serviceMaster;
}
public final WorkerContext getWorkerContext() {
Added: giraph/trunk/src/main/java/org/apache/giraph/graph/MasterAggregatorUsage.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/MasterAggregatorUsage.java?rev=1371108&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/MasterAggregatorUsage.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/MasterAggregatorUsage.java Thu Aug 9 09:10:57 2012
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Master compute can access and change aggregators through this interface
+ */
+public interface MasterAggregatorUsage {
+ /**
+ * Register an aggregator in preSuperstep() and/or preApplication(). This
+ * aggregator will have its value reset at the end of each super step.
+ *
+ * @param name of aggregator
+ * @param aggregatorClass Class type of the aggregator
+ * @param <A> Aggregator type
+ * @return True iff aggregator wasn't already registered
+ */
+ <A extends Writable> boolean registerAggregator(String name,
+ Class<? extends Aggregator<A>> aggregatorClass) throws
+ InstantiationException, IllegalAccessException;
+
+ /**
+ * Register persistent aggregator in preSuperstep() and/or
+ * preApplication(). This aggregator will not reset value at the end of
+ * super step.
+ *
+ * @param name of aggregator
+ * @param aggregatorClass Class type of the aggregator
+ * @param <A> Aggregator type
+ * @return True iff aggregator wasn't already registered
+ */
+ <A extends Writable> boolean registerPersistentAggregator(String name,
+ Class<? extends Aggregator<A>> aggregatorClass) throws
+ InstantiationException, IllegalAccessException;
+
+ /**
+ * Get value of an aggregator.
+ *
+ * @param name Name of aggregator
+ * @param <A> Aggregated value
+ * @return Value of the aggregator
+ */
+ <A extends Writable> A getAggregatedValue(String name);
+
+ /**
+ * Sets value of an aggregator.
+ *
+ * @param name Name of aggregator
+ * @param value Value to set
+ * @param <A> Aggregated value
+ */
+ <A extends Writable> void setAggregatedValue(String name, A value);
+}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java Thu Aug 9 09:10:57 2012
@@ -40,7 +40,7 @@ import org.apache.hadoop.mapreduce.Mappe
* not have to be called.
*/
@SuppressWarnings("rawtypes")
-public abstract class MasterCompute implements AggregatorUsage, Writable,
+public abstract class MasterCompute implements MasterAggregatorUsage, Writable,
Configurable {
/** If true, do not do anymore computation on this vertex. */
private boolean halt = false;
@@ -135,23 +135,32 @@ public abstract class MasterCompute impl
}
@Override
- public final <A extends Writable> Aggregator<A> registerAggregator(
+ public final <A extends Writable> boolean registerAggregator(
String name, Class<? extends Aggregator<A>> aggregatorClass)
throws InstantiationException, IllegalAccessException {
- return getGraphState().getGraphMapper().getAggregatorUsage().
+ return getGraphState().getGraphMapper().getMasterAggregatorUsage().
registerAggregator(name, aggregatorClass);
}
@Override
- public final Aggregator<? extends Writable> getAggregator(String name) {
- return getGraphState().getGraphMapper().getAggregatorUsage().
- getAggregator(name);
+ public final <A extends Writable> boolean registerPersistentAggregator(
+ String name,
+ Class<? extends Aggregator<A>> aggregatorClass) throws
+ InstantiationException, IllegalAccessException {
+ return getGraphState().getGraphMapper().getMasterAggregatorUsage().
+ registerPersistentAggregator(name, aggregatorClass);
}
@Override
- public final boolean useAggregator(String name) {
- return getGraphState().getGraphMapper().getAggregatorUsage().
- useAggregator(name);
+ public <A extends Writable> A getAggregatedValue(String name) {
+ return getGraphState().getGraphMapper().getMasterAggregatorUsage().
+ getAggregatedValue(name);
+ }
+
+ @Override
+ public <A extends Writable> void setAggregatedValue(String name, A value) {
+ getGraphState().getGraphMapper().getMasterAggregatorUsage().
+ setAggregatedValue(name, value);
}
@Override
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java Thu Aug 9 09:10:57 2012
@@ -19,7 +19,6 @@
package org.apache.giraph.graph;
import java.io.IOException;
-import java.util.Map;
import java.util.Map.Entry;
import com.google.common.base.Charsets;
@@ -77,13 +76,13 @@ public class TextAggregatorWriter implem
}
@Override
- public final void writeAggregator(
- Map<String, Aggregator<Writable>> aggregators,
+ public void writeAggregator(
+ Iterable<Entry<String, Writable>> aggregatorMap,
long superstep) throws IOException {
if (shouldWrite(superstep)) {
- for (Entry<String, Aggregator<Writable>> a: aggregators.entrySet()) {
- byte[] bytes = aggregatorToString(a.getKey(), a.getValue(), superstep)
- .getBytes(Charsets.UTF_8);
+ for (Entry<String, Writable> entry : aggregatorMap) {
+ byte[] bytes = aggregatorToString(entry.getKey(), entry.getValue(),
+ superstep).getBytes(Charsets.UTF_8);
output.write(bytes, 0, bytes.length);
}
output.flush();
@@ -95,17 +94,15 @@ public class TextAggregatorWriter implem
* Override this if you want to implement your own text format.
*
* @param aggregatorName Name of the aggregator
- * @param a Aggregator
+ * @param value Value of aggregator
* @param superstep Current superstep
* @return The String representation for the aggregator
*/
protected String aggregatorToString(String aggregatorName,
- Aggregator<Writable> a,
+ Writable value,
long superstep) {
-
return new StringBuilder("superstep=").append(superstep).append("\t")
- .append(aggregatorName).append("=").append(a.getAggregatedValue())
- .append("\t").append(a.getClass().getCanonicalName()).append("\n")
+ .append(aggregatorName).append("=").append(value).append("\n")
.toString();
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java Thu Aug 9 09:10:57 2012
@@ -45,7 +45,7 @@ import java.util.Map;
@SuppressWarnings("rawtypes")
public abstract class Vertex<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
- implements AggregatorUsage, Writable, Configurable {
+ implements WorkerAggregatorUsage, Writable, Configurable {
/** Vertex id. */
private I id;
/** Vertex value. */
@@ -315,23 +315,15 @@ public abstract class Vertex<I extends W
}
@Override
- public final <A extends Writable> Aggregator<A> registerAggregator(
- String name, Class<? extends Aggregator<A>> aggregatorClass)
- throws InstantiationException, IllegalAccessException {
- return getGraphState().getGraphMapper().getAggregatorUsage().
- registerAggregator(name, aggregatorClass);
+ public <A extends Writable> void aggregate(String name, A value) {
+ getGraphState().getGraphMapper().getWorkerAggregatorUsage().
+ aggregate(name, value);
}
@Override
- public final Aggregator<? extends Writable> getAggregator(String name) {
- return getGraphState().getGraphMapper().getAggregatorUsage().
- getAggregator(name);
- }
-
- @Override
- public final boolean useAggregator(String name) {
- return getGraphState().getGraphMapper().getAggregatorUsage().
- useAggregator(name);
+ public <A extends Writable> A getAggregatedValue(String name) {
+ return getGraphState().getGraphMapper().getWorkerAggregatorUsage().
+ getAggregatedValue(name);
}
@Override
Added: giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerAggregatorUsage.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerAggregatorUsage.java?rev=1371108&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerAggregatorUsage.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerAggregatorUsage.java Thu Aug 9 09:10:57 2012
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Vertex classes can access and change aggregators through this interface
+ */
+public interface WorkerAggregatorUsage {
+ /**
+ * Add a new value
+ *
+ * @param name Name of aggregator
+ * @param value Value to add
+ * @param <A> Aggregated value
+ */
+ <A extends Writable> void aggregate(String name, A value);
+
+ /**
+ * Get value of an aggregator.
+ *
+ * @param name Name of aggregator
+ * @param <A> Aggregated value
+ * @return Value of the aggregator
+ */
+ <A extends Writable> A getAggregatedValue(String name);
+}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java Thu Aug 9 09:10:57 2012
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.Mappe
* on a per-worker basis. There's one WorkerContext per worker.
*/
@SuppressWarnings("rawtypes")
-public abstract class WorkerContext implements AggregatorUsage {
+public abstract class WorkerContext implements WorkerAggregatorUsage {
/** Global graph state */
private GraphState graphState;
@@ -110,23 +110,14 @@ public abstract class WorkerContext impl
}
@Override
- public final <A extends Writable> Aggregator<A> registerAggregator(
- String name,
- Class<? extends Aggregator<A>> aggregatorClass)
- throws InstantiationException, IllegalAccessException {
- return graphState.getGraphMapper().getAggregatorUsage().
- registerAggregator(name, aggregatorClass);
+ public <A extends Writable> void aggregate(String name, A value) {
+ graphState.getGraphMapper().getWorkerAggregatorUsage().
+ aggregate(name, value);
}
@Override
- public final Aggregator<? extends Writable> getAggregator(String name) {
- return graphState.getGraphMapper().getAggregatorUsage().
- getAggregator(name);
- }
-
- @Override
- public final boolean useAggregator(String name) {
- return graphState.getGraphMapper().getAggregatorUsage().
- useAggregator(name);
+ public <A extends Writable> A getAggregatedValue(String name) {
+ return graphState.getGraphMapper().getWorkerAggregatorUsage().
+ getAggregatedValue(name);
}
}
Modified: giraph/trunk/src/test/java/org/apache/giraph/BspCase.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/BspCase.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/BspCase.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/BspCase.java Thu Aug 9 09:10:57 2012
@@ -174,6 +174,27 @@ public class BspCase implements Watcher
protected GiraphJob prepareJob(String name, Class<?> vertexClass,
Class<?> workerContextClass, Class<?> vertexInputFormatClass,
Class<?> vertexOutputFormatClass, Path outputPath) throws IOException {
+ return prepareJob(name, vertexClass, workerContextClass, null,
+ vertexInputFormatClass, vertexOutputFormatClass, outputPath);
+ }
+
+ /**
+ * Prepare a GiraphJob for test purposes
+ *
+ * @param name identifying name for the job
+ * @param vertexClass class of the vertex to run
+ * @param workerContextClass class of the workercontext to use
+ * @param masterComputeClass class of mastercompute to use
+ * @param vertexInputFormatClass inputformat to use
+ * @param vertexOutputFormatClass outputformat to use
+ * @param outputPath destination path for the output
+ * @return fully configured job instance
+ * @throws IOException
+ */
+ protected GiraphJob prepareJob(String name, Class<?> vertexClass,
+ Class<?> workerContextClass, Class<?> masterComputeClass,
+ Class<?> vertexInputFormatClass, Class<?> vertexOutputFormatClass,
+ Path outputPath) throws IOException {
GiraphJob job = new GiraphJob(name);
setupConfiguration(job);
job.setVertexClass(vertexClass);
@@ -182,6 +203,9 @@ public class BspCase implements Watcher
if (workerContextClass != null) {
job.setWorkerContextClass(workerContextClass);
}
+ if (masterComputeClass != null) {
+ job.setMasterComputeClass(masterComputeClass);
+ }
if (vertexOutputFormatClass != null) {
job.setVertexOutputFormatClass(vertexOutputFormatClass);
}
Added: giraph/trunk/src/test/java/org/apache/giraph/TestAggregatorsHandling.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestAggregatorsHandling.java?rev=1371108&view=auto
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestAggregatorsHandling.java (added)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestAggregatorsHandling.java Thu Aug 9 09:10:57 2012
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph;
+
+import org.apache.giraph.examples.AggregatorsTestVertex;
+import org.apache.giraph.examples.SimplePageRankVertex;
+import org.apache.giraph.graph.GiraphJob;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+/** Tests if aggregators are handled on a proper way during supersteps */
+public class TestAggregatorsHandling extends BspCase {
+
+ public TestAggregatorsHandling() {
+ super(TestAggregatorsHandling.class.getName());
+ }
+
+ @Test
+ public void testAggregatorsHandling() throws IOException,
+ ClassNotFoundException, InterruptedException {
+ GiraphJob job = prepareJob(getCallingMethodName(),
+ AggregatorsTestVertex.class,
+ SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
+ job.setMasterComputeClass(
+ AggregatorsTestVertex.AggregatorsTestMasterCompute.class);
+ job.getConfiguration().setBoolean(GiraphJob.USE_NETTY, true);
+ assertTrue(job.run(true));
+ }
+}
Modified: giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java Thu Aug 9 09:10:57 2012
@@ -59,6 +59,7 @@ public class TestAutoCheckpoint extends
GiraphJob job = prepareJob(getCallingMethodName(),
SimpleCheckpointVertex.class,
SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class,
+ SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class,
SimpleSuperstepVertexInputFormat.class,
SimpleSuperstepVertexOutputFormat.class,
outputPath);
Modified: giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java Thu Aug 9 09:10:57 2012
@@ -301,6 +301,8 @@ public class TestBspBasic extends BspCas
SimplePageRankVertex.class, SimplePageRankVertexInputFormat.class);
job.setWorkerContextClass(
SimplePageRankVertex.SimplePageRankVertexWorkerContext.class);
+ job.setMasterComputeClass(
+ SimplePageRankVertex.SimplePageRankVertexMasterCompute.class);
assertTrue(job.run(true));
if (!runningInDistributedMode()) {
double maxPageRank =
@@ -362,6 +364,8 @@ public class TestBspBasic extends BspCas
outputPath);
job.setWorkerContextClass(
SimplePageRankVertex.SimplePageRankVertexWorkerContext.class);
+ job.setMasterComputeClass(
+ SimplePageRankVertex.SimplePageRankVertexMasterCompute.class);
Configuration conf = job.getConfiguration();
@@ -401,23 +405,23 @@ public class TestBspBasic extends BspCas
String[] tokens = line.split("\t");
int superstep = Integer.parseInt(tokens[0].split("=")[1]);
String value = (tokens[1].split("=")[1]);
- String aggregatorName = tokens[2];
+ String aggregatorName = (tokens[1].split("=")[0]);
- if (DoubleMinAggregator.class.getName().equals(aggregatorName)) {
+ if ("min".equals(aggregatorName)) {
minValues.put(superstep, Double.parseDouble(value));
}
- if (DoubleMaxAggregator.class.getName().equals(aggregatorName)) {
+ if ("max".equals(aggregatorName)) {
maxValues.put(superstep, Double.parseDouble(value));
}
- if (LongSumAggregator.class.getName().equals(aggregatorName)) {
+ if ("sum".equals(aggregatorName)) {
vertexCounts.put(superstep, Long.parseLong(value));
}
}
int maxSuperstep = SimplePageRankVertex.MAX_SUPERSTEPS;
- assertEquals(maxSuperstep + 1, minValues.size());
- assertEquals(maxSuperstep + 1, maxValues.size());
- assertEquals(maxSuperstep + 1, vertexCounts.size());
+ assertEquals(maxSuperstep + 2, minValues.size());
+ assertEquals(maxSuperstep + 2, maxValues.size());
+ assertEquals(maxSuperstep + 2, vertexCounts.size());
assertEquals(maxPageRank, maxValues.get(maxSuperstep));
assertEquals(minPageRank, minValues.get(maxSuperstep));
Modified: giraph/trunk/src/test/java/org/apache/giraph/TestGraphPartitioner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestGraphPartitioner.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestGraphPartitioner.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestGraphPartitioner.java Thu Aug 9 09:10:57 2012
@@ -77,6 +77,7 @@ public class TestGraphPartitioner extend
GiraphJob job = prepareJob("testVertexBalancer",
SimpleCheckpointVertex.class,
SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class,
+ SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class,
SimpleSuperstepVertexInputFormat.class,
SimpleSuperstepVertexOutputFormat.class, outputPath);
@@ -91,6 +92,7 @@ public class TestGraphPartitioner extend
outputPath = getTempPath("testHashPartitioner");
job = prepareJob("testHashPartitioner", SimpleCheckpointVertex.class,
SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class,
+ SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class,
SimpleSuperstepVertexInputFormat.class,
SimpleSuperstepVertexOutputFormat.class, outputPath);
assertTrue(job.run(true));
@@ -100,6 +102,7 @@ public class TestGraphPartitioner extend
job = prepareJob("testSuperstepHashPartitioner",
SimpleCheckpointVertex.class,
SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class,
+ SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class,
SimpleSuperstepVertexInputFormat.class,
SimpleSuperstepVertexOutputFormat.class,
outputPath);
@@ -115,6 +118,8 @@ public class TestGraphPartitioner extend
job.setVertexClass(SimpleCheckpointVertex.class);
job.setWorkerContextClass(
SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+ job.setMasterComputeClass(
+ SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
job.setGraphPartitionerFactoryClass(
@@ -128,6 +133,7 @@ public class TestGraphPartitioner extend
job = prepareJob("testReverseIdSuperstepHashPartitioner",
SimpleCheckpointVertex.class,
SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class,
+ SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class,
SimpleSuperstepVertexInputFormat.class,
SimpleSuperstepVertexOutputFormat.class, outputPath);
job.setGraphPartitionerFactoryClass(
Modified: giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java Thu Aug 9 09:10:57 2012
@@ -53,6 +53,7 @@ public class TestManualCheckpoint extend
GiraphJob job = prepareJob(getCallingMethodName(),
SimpleCheckpointVertex.class,
SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class,
+ SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class,
SimpleSuperstepVertexInputFormat.class,
SimpleSuperstepVertexOutputFormat.class, outputPath);
@@ -81,8 +82,11 @@ public class TestManualCheckpoint extend
GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
SimpleCheckpointVertex.class,
SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class,
+ SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class,
SimpleSuperstepVertexInputFormat.class,
SimpleSuperstepVertexOutputFormat.class, outputPath);
+ job.setMasterComputeClass(
+ SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
restartedJob.getConfiguration().set(GiraphJob.CHECKPOINT_DIRECTORY,
checkpointsDir.toString());
Modified: giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestBooleanAggregators.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestBooleanAggregators.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestBooleanAggregators.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestBooleanAggregators.java Thu Aug 9 09:10:57 2012
@@ -29,13 +29,13 @@ public class TestBooleanAggregators {
public void testAndAggregator() {
BooleanAndAggregator and = new BooleanAndAggregator();
assertEquals(true, and.getAggregatedValue().get());
- and.aggregate(true);
+ and.aggregate(new BooleanWritable(true));
assertEquals(true, and.getAggregatedValue().get());
and.aggregate(new BooleanWritable(false));
assertEquals(false, and.getAggregatedValue().get());
- and.setAggregatedValue(true);
+ and.setAggregatedValue(new BooleanWritable(true));
assertEquals(true, and.getAggregatedValue().get());
- BooleanWritable bw = and.createAggregatedValue();
+ BooleanWritable bw = and.createInitialValue();
assertNotNull(bw);
}
@@ -43,26 +43,26 @@ public class TestBooleanAggregators {
public void testOrAggregator() {
BooleanOrAggregator or = new BooleanOrAggregator();
assertEquals(false, or.getAggregatedValue().get());
- or.aggregate(false);
+ or.aggregate(new BooleanWritable(false));
assertEquals(false, or.getAggregatedValue().get());
or.aggregate(new BooleanWritable(true));
assertEquals(true, or.getAggregatedValue().get());
- or.setAggregatedValue(false);
+ or.setAggregatedValue(new BooleanWritable(false));
assertEquals(false, or.getAggregatedValue().get());
- BooleanWritable bw = or.createAggregatedValue();
+ BooleanWritable bw = or.createInitialValue();
assertNotNull(bw);
}
@Test
public void testOverwriteAggregator() {
BooleanOverwriteAggregator overwrite = new BooleanOverwriteAggregator();
- overwrite.aggregate(true);
+ overwrite.aggregate(new BooleanWritable(true));
assertEquals(true, overwrite.getAggregatedValue().get());
overwrite.aggregate(new BooleanWritable(false));
assertEquals(false, overwrite.getAggregatedValue().get());
- overwrite.setAggregatedValue(true);
+ overwrite.setAggregatedValue(new BooleanWritable(true));
assertEquals(true, overwrite.getAggregatedValue().get());
- BooleanWritable bw = overwrite.createAggregatedValue();
+ BooleanWritable bw = overwrite.createInitialValue();
assertNotNull(bw);
}
Modified: giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestDoubleAggregators.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestDoubleAggregators.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestDoubleAggregators.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestDoubleAggregators.java Thu Aug 9 09:10:57 2012
@@ -28,61 +28,61 @@ public class TestDoubleAggregators {
@Test
public void testMaxAggregator() {
DoubleMaxAggregator max = new DoubleMaxAggregator();
- max.aggregate(2.0);
+ max.aggregate(new DoubleWritable(2.0));
max.aggregate(new DoubleWritable(3.0));
assertEquals(3.0, max.getAggregatedValue().get());
- max.setAggregatedValue(1.0);
+ max.setAggregatedValue(new DoubleWritable(1.0));
assertEquals(1.0, max.getAggregatedValue().get());
- DoubleWritable dw = max.createAggregatedValue();
+ DoubleWritable dw = max.createInitialValue();
assertNotNull(dw);
}
@Test
public void testMinAggregator() {
DoubleMinAggregator min = new DoubleMinAggregator();
- min.aggregate(3.0);
+ min.aggregate(new DoubleWritable(3.0));
min.aggregate(new DoubleWritable(2.0));
assertEquals(2.0, min.getAggregatedValue().get());
- min.setAggregatedValue(3.0);
+ min.setAggregatedValue(new DoubleWritable(3.0));
assertEquals(3.0, min.getAggregatedValue().get());
- DoubleWritable dw = min.createAggregatedValue();
+ DoubleWritable dw = min.createInitialValue();
assertNotNull(dw);
}
@Test
public void testOverwriteAggregator() {
DoubleOverwriteAggregator overwrite = new DoubleOverwriteAggregator();
- overwrite.aggregate(1.0);
+ overwrite.aggregate(new DoubleWritable(1.0));
assertEquals(1.0, overwrite.getAggregatedValue().get());
overwrite.aggregate(new DoubleWritable(2.0));
assertEquals(2.0, overwrite.getAggregatedValue().get());
- overwrite.setAggregatedValue(3.0);
+ overwrite.setAggregatedValue(new DoubleWritable(3.0));
assertEquals(3.0, overwrite.getAggregatedValue().get());
- DoubleWritable dw = overwrite.createAggregatedValue();
+ DoubleWritable dw = overwrite.createInitialValue();
assertNotNull(dw);
}
-
+
@Test
public void testProductAggregator() {
DoubleProductAggregator product = new DoubleProductAggregator();
- product.aggregate(6.0);
+ product.aggregate(new DoubleWritable(6.0));
product.aggregate(new DoubleWritable(7.0));
assertEquals(42.0, product.getAggregatedValue().get());
- product.setAggregatedValue(1.0);
+ product.setAggregatedValue(new DoubleWritable(1.0));
assertEquals(1.0, product.getAggregatedValue().get());
- DoubleWritable dw = product.createAggregatedValue();
+ DoubleWritable dw = product.createInitialValue();
assertNotNull(dw);
}
@Test
public void testSumAggregator() {
DoubleSumAggregator sum = new DoubleSumAggregator();
- sum.aggregate(1.0);
+ sum.aggregate(new DoubleWritable(1.0));
sum.aggregate(new DoubleWritable(2.0));
assertEquals(3.0, sum.getAggregatedValue().get());
- sum.setAggregatedValue(4.0);
+ sum.setAggregatedValue(new DoubleWritable(4.0));
assertEquals(4.0, sum.getAggregatedValue().get());
- DoubleWritable dw = sum.createAggregatedValue();
+ DoubleWritable dw = sum.createInitialValue();
assertNotNull(dw);
}
Modified: giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestFloatAggregators.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestFloatAggregators.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestFloatAggregators.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestFloatAggregators.java Thu Aug 9 09:10:57 2012
@@ -28,61 +28,61 @@ public class TestFloatAggregators {
@Test
public void testMaxAggregator() {
FloatMaxAggregator max = new FloatMaxAggregator();
- max.aggregate(2.0f);
+ max.aggregate(new FloatWritable(2.0f));
max.aggregate(new FloatWritable(3.0f));
assertEquals(3.0f, max.getAggregatedValue().get());
- max.setAggregatedValue(1.0f);
+ max.setAggregatedValue(new FloatWritable(1.0f));
assertEquals(1.0f, max.getAggregatedValue().get());
- FloatWritable fw = max.createAggregatedValue();
+ FloatWritable fw = max.createInitialValue();
assertNotNull(fw);
}
@Test
public void testMinAggregator() {
FloatMinAggregator min = new FloatMinAggregator();
- min.aggregate(3.0f);
+ min.aggregate(new FloatWritable(3.0f));
min.aggregate(new FloatWritable(2.0f));
assertEquals(2.0f, min.getAggregatedValue().get());
- min.setAggregatedValue(3.0f);
+ min.setAggregatedValue(new FloatWritable(3.0f));
assertEquals(3.0f, min.getAggregatedValue().get());
- FloatWritable fw = min.createAggregatedValue();
+ FloatWritable fw = min.createInitialValue();
assertNotNull(fw);
}
@Test
public void testOverwriteAggregator() {
FloatOverwriteAggregator overwrite = new FloatOverwriteAggregator();
- overwrite.aggregate(1.0f);
+ overwrite.aggregate(new FloatWritable(1.0f));
assertEquals(1.0f, overwrite.getAggregatedValue().get());
overwrite.aggregate(new FloatWritable(2.0f));
assertEquals(2.0f, overwrite.getAggregatedValue().get());
- overwrite.setAggregatedValue(3.0f);
+ overwrite.setAggregatedValue(new FloatWritable(3.0f));
assertEquals(3.0f, overwrite.getAggregatedValue().get());
- FloatWritable fw = overwrite.createAggregatedValue();
+ FloatWritable fw = overwrite.createInitialValue();
assertNotNull(fw);
}
-
+
@Test
public void testProductAggregator() {
FloatProductAggregator product = new FloatProductAggregator();
- product.aggregate(6.0f);
+ product.aggregate(new FloatWritable(6.0f));
product.aggregate(new FloatWritable(7.0f));
assertEquals(42.0f, product.getAggregatedValue().get());
- product.setAggregatedValue(1.0f);
+ product.setAggregatedValue(new FloatWritable(1.0f));
assertEquals(1.0f, product.getAggregatedValue().get());
- FloatWritable fw = product.createAggregatedValue();
+ FloatWritable fw = product.createInitialValue();
assertNotNull(fw);
}
@Test
public void testSumAggregator() {
FloatSumAggregator sum = new FloatSumAggregator();
- sum.aggregate(1.0f);
+ sum.aggregate(new FloatWritable(1.0f));
sum.aggregate(new FloatWritable(2.0f));
assertEquals(3.0f, sum.getAggregatedValue().get());
- sum.setAggregatedValue(4.0f);
+ sum.setAggregatedValue(new FloatWritable(4.0f));
assertEquals(4.0f, sum.getAggregatedValue().get());
- FloatWritable fw = sum.createAggregatedValue();
+ FloatWritable fw = sum.createInitialValue();
assertNotNull(fw);
}
Modified: giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestIntAggregators.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestIntAggregators.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestIntAggregators.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestIntAggregators.java Thu Aug 9 09:10:57 2012
@@ -28,61 +28,61 @@ public class TestIntAggregators {
@Test
public void testMaxAggregator() {
IntMaxAggregator max = new IntMaxAggregator();
- max.aggregate(2);
+ max.aggregate(new IntWritable(2));
max.aggregate(new IntWritable(3));
assertEquals(3, max.getAggregatedValue().get());
- max.setAggregatedValue(1);
+ max.setAggregatedValue(new IntWritable(1));
assertEquals(1, max.getAggregatedValue().get());
- IntWritable iw = max.createAggregatedValue();
+ IntWritable iw = max.createInitialValue();
assertNotNull(iw);
}
@Test
public void testMinAggregator() {
IntMinAggregator min = new IntMinAggregator();
- min.aggregate(3);
+ min.aggregate(new IntWritable(3));
min.aggregate(new IntWritable(2));
assertEquals(2, min.getAggregatedValue().get());
- min.setAggregatedValue(3);
+ min.setAggregatedValue(new IntWritable(3));
assertEquals(3, min.getAggregatedValue().get());
- IntWritable iw = min.createAggregatedValue();
+ IntWritable iw = min.createInitialValue();
assertNotNull(iw);
}
@Test
public void testOverwriteAggregator() {
IntOverwriteAggregator overwrite = new IntOverwriteAggregator();
- overwrite.aggregate(1);
+ overwrite.aggregate(new IntWritable(1));
assertEquals(1, overwrite.getAggregatedValue().get());
overwrite.aggregate(new IntWritable(2));
assertEquals(2, overwrite.getAggregatedValue().get());
- overwrite.setAggregatedValue(3);
+ overwrite.setAggregatedValue(new IntWritable(3));
assertEquals(3, overwrite.getAggregatedValue().get());
- IntWritable iw = overwrite.createAggregatedValue();
+ IntWritable iw = overwrite.createInitialValue();
assertNotNull(iw);
}
@Test
public void testProductAggregator() {
IntProductAggregator product = new IntProductAggregator();
- product.aggregate(6);
+ product.aggregate(new IntWritable(6));
product.aggregate(new IntWritable(7));
assertEquals(42, product.getAggregatedValue().get());
- product.setAggregatedValue(1);
+ product.setAggregatedValue(new IntWritable(1));
assertEquals(1, product.getAggregatedValue().get());
- IntWritable iw = product.createAggregatedValue();
+ IntWritable iw = product.createInitialValue();
assertNotNull(iw);
}
@Test
public void testSumAggregator() {
IntSumAggregator sum = new IntSumAggregator();
- sum.aggregate(1);
+ sum.aggregate(new IntWritable(1));
sum.aggregate(new IntWritable(2));
assertEquals(3, sum.getAggregatedValue().get());
- sum.setAggregatedValue(4);
+ sum.setAggregatedValue(new IntWritable(4));
assertEquals(4, sum.getAggregatedValue().get());
- IntWritable iw = sum.createAggregatedValue();
+ IntWritable iw = sum.createInitialValue();
assertNotNull(iw);
}
Modified: giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestLongAggregators.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestLongAggregators.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestLongAggregators.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestLongAggregators.java Thu Aug 9 09:10:57 2012
@@ -28,61 +28,61 @@ public class TestLongAggregators {
@Test
public void testMaxAggregator() {
LongMaxAggregator max = new LongMaxAggregator();
- max.aggregate(2L);
+ max.aggregate(new LongWritable(2L));
max.aggregate(new LongWritable(3L));
assertEquals(3L, max.getAggregatedValue().get());
- max.setAggregatedValue(1L);
+ max.setAggregatedValue(new LongWritable(1L));
assertEquals(1L, max.getAggregatedValue().get());
- LongWritable lw = max.createAggregatedValue();
+ LongWritable lw = max.createInitialValue();
assertNotNull(lw);
}
@Test
public void testMinAggregator() {
LongMinAggregator min = new LongMinAggregator();
- min.aggregate(3L);
+ min.aggregate(new LongWritable(3L));
min.aggregate(new LongWritable(2L));
assertEquals(2L, min.getAggregatedValue().get());
- min.setAggregatedValue(3L);
+ min.setAggregatedValue(new LongWritable(3L));
assertEquals(3L, min.getAggregatedValue().get());
- LongWritable lw = min.createAggregatedValue();
+ LongWritable lw = min.createInitialValue();
assertNotNull(lw);
}
@Test
public void testOverwriteAggregator() {
LongOverwriteAggregator overwrite = new LongOverwriteAggregator();
- overwrite.aggregate(1L);
+ overwrite.aggregate(new LongWritable(1L));
assertEquals(1L, overwrite.getAggregatedValue().get());
overwrite.aggregate(new LongWritable(2L));
assertEquals(2L, overwrite.getAggregatedValue().get());
- overwrite.setAggregatedValue(3L);
+ overwrite.setAggregatedValue(new LongWritable(3L));
assertEquals(3L, overwrite.getAggregatedValue().get());
- LongWritable lw = overwrite.createAggregatedValue();
+ LongWritable lw = overwrite.createInitialValue();
assertNotNull(lw);
}
@Test
public void testProductAggregator() {
LongProductAggregator product = new LongProductAggregator();
- product.aggregate(6L);
+ product.aggregate(new LongWritable(6L));
product.aggregate(new LongWritable(7L));
assertEquals(42L, product.getAggregatedValue().get());
- product.setAggregatedValue(1L);
+ product.setAggregatedValue(new LongWritable(1L));
assertEquals(1L, product.getAggregatedValue().get());
- LongWritable lw = product.createAggregatedValue();
+ LongWritable lw = product.createInitialValue();
assertNotNull(lw);
}
@Test
public void testSumAggregator() {
LongSumAggregator sum = new LongSumAggregator();
- sum.aggregate(1L);
+ sum.aggregate(new LongWritable(1L));
sum.aggregate(new LongWritable(2L));
assertEquals(3L, sum.getAggregatedValue().get());
- sum.setAggregatedValue(4L);
+ sum.setAggregatedValue(new LongWritable(4L));
assertEquals(4L, sum.getAggregatedValue().get());
- LongWritable lw = sum.createAggregatedValue();
+ LongWritable lw = sum.createInitialValue();
assertNotNull(lw);
}