You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2012/08/09 11:10:59 UTC
svn commit: r1371108 [1/2] - in /giraph/trunk: ./
src/main/java/org/apache/giraph/aggregators/
src/main/java/org/apache/giraph/benchmark/
src/main/java/org/apache/giraph/bsp/
src/main/java/org/apache/giraph/examples/
src/main/java/org/apache/giraph/gra...
Author: apresta
Date: Thu Aug 9 09:10:57 2012
New Revision: 1371108
URL: http://svn.apache.org/viewvc?rev=1371108&view=rev
Log:
GIRAPH-259: TestBspBasic.testBspPageRank is broken (majakabiljo via apresta)
Added:
giraph/trunk/src/main/java/org/apache/giraph/aggregators/BasicAggregator.java
giraph/trunk/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java
giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWrapper.java
giraph/trunk/src/main/java/org/apache/giraph/graph/MasterAggregatorUsage.java
giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerAggregatorUsage.java
giraph/trunk/src/test/java/org/apache/giraph/TestAggregatorsHandling.java
Removed:
giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorUsage.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanAndAggregator.java
giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanOrAggregator.java
giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanOverwriteAggregator.java
giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleMaxAggregator.java
giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleMinAggregator.java
giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleOverwriteAggregator.java
giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleProductAggregator.java
giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleSumAggregator.java
giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatMaxAggregator.java
giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatMinAggregator.java
giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatOverwriteAggregator.java
giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatProductAggregator.java
giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatSumAggregator.java
giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntMaxAggregator.java
giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntMinAggregator.java
giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntOverwriteAggregator.java
giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntProductAggregator.java
giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntSumAggregator.java
giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongMaxAggregator.java
giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongMinAggregator.java
giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongOverwriteAggregator.java
giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongProductAggregator.java
giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongSumAggregator.java
giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java
giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java
giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java
giraph/trunk/src/main/java/org/apache/giraph/graph/Aggregator.java
giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java
giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java
giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java
giraph/trunk/src/test/java/org/apache/giraph/BspCase.java
giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
giraph/trunk/src/test/java/org/apache/giraph/TestGraphPartitioner.java
giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java
giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestBooleanAggregators.java
giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestDoubleAggregators.java
giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestFloatAggregators.java
giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestIntAggregators.java
giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestLongAggregators.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Thu Aug 9 09:10:57 2012
@@ -2,6 +2,8 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-259: TestBspBasic.testBspPageRank is broken (majakabiljo via apresta)
+
GIRAPH-256: Partitioning outgoing graph data during INPUT_SUPERSTEP by # of
vertices results in wide variance in RPC message sizes. (Eli Reisman via jghoman)
Added: giraph/trunk/src/main/java/org/apache/giraph/aggregators/BasicAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/BasicAggregator.java?rev=1371108&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/BasicAggregator.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/BasicAggregator.java Thu Aug 9 09:10:57 2012
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators;
+
+import org.apache.giraph.graph.Aggregator;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Abstract class for {@link Aggregator}.
+ * Implements get value, set value and reset methods and has internal value
+ * object.
+ *
+ * @param <A> Aggregated value
+ */
+public abstract class BasicAggregator<A extends Writable> implements
+ Aggregator<A> {
+ /** Internal value */
+ private A value;
+
+ /**
+ * Default constructor.
+ * Creates new value object and resets the aggregator
+ */
+ public BasicAggregator() {
+ value = createInitialValue();
+ }
+
+ @Override
+ public A getAggregatedValue() {
+ return value;
+ }
+
+ @Override
+ public void setAggregatedValue(A value) {
+ this.value = value;
+ }
+
+ @Override
+ public void reset() {
+ value = createInitialValue();
+ }
+}
Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanAndAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanAndAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanAndAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanAndAggregator.java Thu Aug 9 09:10:57 2012
@@ -20,51 +20,18 @@ package org.apache.giraph.aggregators;
import org.apache.hadoop.io.BooleanWritable;
-import org.apache.giraph.graph.Aggregator;
-
/**
* Aggregator for calculating the AND function over boolean values.
* The default value when nothing is aggregated is true.
*/
-public class BooleanAndAggregator implements Aggregator<BooleanWritable> {
- /** Internal result */
- private boolean result = true;
-
- /**
- * Aggregate with a primitive boolean.
- *
- * @param value Boolean value to aggregate.
- */
- public void aggregate(boolean value) {
- result = result && value;
- }
-
+public class BooleanAndAggregator extends BasicAggregator<BooleanWritable> {
@Override
public void aggregate(BooleanWritable value) {
- result = result && value.get();
- }
-
- /**
- * Set aggregated value using a primitive boolean.
- *
- * @param value Boolean value to set.
- */
- public void setAggregatedValue(boolean value) {
- result = value;
- }
-
- @Override
- public void setAggregatedValue(BooleanWritable value) {
- result = value.get();
- }
-
- @Override
- public BooleanWritable getAggregatedValue() {
- return new BooleanWritable(result);
+ getAggregatedValue().set(getAggregatedValue().get() && value.get());
}
@Override
- public BooleanWritable createAggregatedValue() {
- return new BooleanWritable();
+ public BooleanWritable createInitialValue() {
+ return new BooleanWritable(true);
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanOrAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanOrAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanOrAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanOrAggregator.java Thu Aug 9 09:10:57 2012
@@ -20,51 +20,18 @@ package org.apache.giraph.aggregators;
import org.apache.hadoop.io.BooleanWritable;
-import org.apache.giraph.graph.Aggregator;
-
/**
* Aggregator for calculating the OR function over boolean values.
* The default value when nothing is aggregated is false.
*/
-public class BooleanOrAggregator implements Aggregator<BooleanWritable> {
- /** Internal result */
- private boolean result = false;
-
- /**
- * Aggregate with a primitive boolean.
- *
- * @param value Boolean value to aggregate.
- */
- public void aggregate(boolean value) {
- result = result || value;
- }
-
+public class BooleanOrAggregator extends BasicAggregator<BooleanWritable> {
@Override
public void aggregate(BooleanWritable value) {
- result = result || value.get();
- }
-
- /**
- * Set aggregated value using a primitive boolean.
- *
- * @param value Boolean value to set.
- */
- public void setAggregatedValue(boolean value) {
- result = value;
- }
-
- @Override
- public void setAggregatedValue(BooleanWritable value) {
- result = value.get();
- }
-
- @Override
- public BooleanWritable getAggregatedValue() {
- return new BooleanWritable(result);
+ getAggregatedValue().set(getAggregatedValue().get() || value.get());
}
@Override
- public BooleanWritable createAggregatedValue() {
- return new BooleanWritable();
+ public BooleanWritable createInitialValue() {
+ return new BooleanWritable(false);
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanOverwriteAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanOverwriteAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanOverwriteAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanOverwriteAggregator.java Thu Aug 9 09:10:57 2012
@@ -20,8 +20,6 @@ package org.apache.giraph.aggregators;
import org.apache.hadoop.io.BooleanWritable;
-import org.apache.giraph.graph.Aggregator;
-
/**
* Aggregator that stores a value that is overwritten once another value is
* aggregated. This aggregator is useful for one-to-many communication from
@@ -29,45 +27,15 @@ import org.apache.giraph.graph.Aggregato
* to this aggregator, its behavior is non-deterministic. The default value
* for this aggregator is false.
*/
-public class BooleanOverwriteAggregator implements Aggregator<BooleanWritable> {
- /** Internal result */
- private boolean result = false;
-
- /**
- * Aggregate with a primitive boolean.
- *
- * @param value Boolean value to aggregate.
- */
- public void aggregate(boolean value) {
- result = value;
- }
-
+public class BooleanOverwriteAggregator extends
+ BasicAggregator<BooleanWritable> {
@Override
public void aggregate(BooleanWritable value) {
- result = value.get();
- }
-
- /**
- * Set aggregated value using a primitive boolean.
- *
- * @param value Boolean value to set.
- */
- public void setAggregatedValue(boolean value) {
- result = value;
- }
-
- @Override
- public void setAggregatedValue(BooleanWritable value) {
- result = value.get();
- }
-
- @Override
- public BooleanWritable getAggregatedValue() {
- return new BooleanWritable(result);
+ getAggregatedValue().set(value.get());
}
@Override
- public BooleanWritable createAggregatedValue() {
- return new BooleanWritable();
+ public BooleanWritable createInitialValue() {
+ return new BooleanWritable(false);
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleMaxAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleMaxAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleMaxAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleMaxAggregator.java Thu Aug 9 09:10:57 2012
@@ -20,57 +20,18 @@ package org.apache.giraph.aggregators;
import org.apache.hadoop.io.DoubleWritable;
-import org.apache.giraph.graph.Aggregator;
-
/**
* Aggregator for getting max double value.
- *
*/
-public class DoubleMaxAggregator implements Aggregator<DoubleWritable> {
- /** Saved maximum value */
- private double max = Double.MIN_VALUE;
-
- /**
- * Aggregate with a primitive double.
- *
- * @param value Double value to aggregate.
- */
- public void aggregate(double value) {
- double val = value;
- if (val > max) {
- max = val;
- }
- }
-
+public class DoubleMaxAggregator extends BasicAggregator<DoubleWritable> {
@Override
public void aggregate(DoubleWritable value) {
- double val = value.get();
- if (val > max) {
- max = val;
- }
- }
-
- /**
- * Set aggregated value using a primitive double.
- *
- * @param value Double value to set.
- */
- public void setAggregatedValue(double value) {
- max = value;
- }
-
- @Override
- public void setAggregatedValue(DoubleWritable value) {
- max = value.get();
- }
-
- @Override
- public DoubleWritable getAggregatedValue() {
- return new DoubleWritable(max);
+ getAggregatedValue().set(
+ Math.max(getAggregatedValue().get(), value.get()));
}
@Override
- public DoubleWritable createAggregatedValue() {
- return new DoubleWritable();
+ public DoubleWritable createInitialValue() {
+ return new DoubleWritable(Double.MIN_VALUE);
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleMinAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleMinAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleMinAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleMinAggregator.java Thu Aug 9 09:10:57 2012
@@ -20,57 +20,18 @@ package org.apache.giraph.aggregators;
import org.apache.hadoop.io.DoubleWritable;
-import org.apache.giraph.graph.Aggregator;
-
/**
* Aggregator for getting min double value.
*/
-public class DoubleMinAggregator implements Aggregator<DoubleWritable> {
- /** Internal aggregator */
- private double min = Double.MAX_VALUE;
-
- /**
- * Aggregate with a primitive double.
- *
- * @param value Double value to aggregate.
- */
- public void aggregate(double value) {
- double val = value;
- if (val < min) {
- min = val;
- }
- }
-
+public class DoubleMinAggregator extends BasicAggregator<DoubleWritable> {
@Override
public void aggregate(DoubleWritable value) {
- double val = value.get();
- if (val < min) {
- min = val;
- }
- }
-
- /**
- * Set aggregated value using a primitive double.
- *
- * @param value Double value to set.
- */
- public void setAggregatedValue(double value) {
- min = value;
+ getAggregatedValue().set(
+ Math.min(getAggregatedValue().get(), value.get()));
}
@Override
- public void setAggregatedValue(DoubleWritable value) {
- min = value.get();
+ public DoubleWritable createInitialValue() {
+ return new DoubleWritable(Double.MAX_VALUE);
}
-
- @Override
- public DoubleWritable getAggregatedValue() {
- return new DoubleWritable(min);
- }
-
- @Override
- public DoubleWritable createAggregatedValue() {
- return new DoubleWritable();
- }
-
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleOverwriteAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleOverwriteAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleOverwriteAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleOverwriteAggregator.java Thu Aug 9 09:10:57 2012
@@ -20,53 +20,21 @@ package org.apache.giraph.aggregators;
import org.apache.hadoop.io.DoubleWritable;
-import org.apache.giraph.graph.Aggregator;
-
/**
* Aggregator that stores a value that is overwritten once another value is
* aggregated. This aggregator is useful for one-to-many communication from
* master.compute() or from a special vertex. In case multiple vertices write
* to this aggregator, its behavior is non-deterministic.
*/
-public class DoubleOverwriteAggregator implements Aggregator<DoubleWritable> {
- /** Internal result */
- private double result = 0.0;
-
- /**
- * Aggregate with a primitive double.
- *
- * @param value Double value to aggregate.
- */
- public void aggregate(double value) {
- result = value;
- }
-
+public class DoubleOverwriteAggregator extends
+ BasicAggregator<DoubleWritable> {
@Override
public void aggregate(DoubleWritable value) {
- result = value.get();
- }
-
- /**
- * Set aggregated value using a primitive double.
- *
- * @param value Double value to set.
- */
- public void setAggregatedValue(double value) {
- result = value;
- }
-
- @Override
- public void setAggregatedValue(DoubleWritable value) {
- result = value.get();
- }
-
- @Override
- public DoubleWritable getAggregatedValue() {
- return new DoubleWritable(result);
+ getAggregatedValue().set(value.get());
}
@Override
- public DoubleWritable createAggregatedValue() {
- return new DoubleWritable();
+ public DoubleWritable createInitialValue() {
+ return new DoubleWritable(0);
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleProductAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleProductAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleProductAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleProductAggregator.java Thu Aug 9 09:10:57 2012
@@ -20,51 +20,17 @@ package org.apache.giraph.aggregators;
import org.apache.hadoop.io.DoubleWritable;
-import org.apache.giraph.graph.Aggregator;
-
/**
* Aggregator for calculating products of double values.
*/
-public class DoubleProductAggregator implements Aggregator<DoubleWritable> {
- /** Aggregated product */
- private double product = 1.0;
-
- /**
- * Aggregate a primitive double.
- *
- * @param value Double value to aggregate.
- */
- public void aggregate(double value) {
- product *= value;
- }
-
+public class DoubleProductAggregator extends BasicAggregator<DoubleWritable> {
@Override
public void aggregate(DoubleWritable value) {
- product *= value.get();
- }
-
- /**
- * Set aggregated value using a primitive double.
- *
- * @param value Double value to set.
- */
- public void setAggregatedValue(double value) {
- product = value;
+ getAggregatedValue().set(getAggregatedValue().get() * value.get());
}
@Override
- public void setAggregatedValue(DoubleWritable value) {
- product = value.get();
+ public DoubleWritable createInitialValue() {
+ return new DoubleWritable(1);
}
-
- @Override
- public DoubleWritable getAggregatedValue() {
- return new DoubleWritable(product);
- }
-
- @Override
- public DoubleWritable createAggregatedValue() {
- return new DoubleWritable();
- }
-
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleSumAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleSumAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleSumAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleSumAggregator.java Thu Aug 9 09:10:57 2012
@@ -20,51 +20,15 @@ package org.apache.giraph.aggregators;
import org.apache.hadoop.io.DoubleWritable;
-import org.apache.giraph.graph.Aggregator;
-
-/**
- * Aggregator for summing up double values.
- */
-public class DoubleSumAggregator implements Aggregator<DoubleWritable> {
- /** Aggregated sum */
- private double sum = 0;
-
- /**
- * Aggregate a primitive double.
- *
- * @param value Double value to aggregate.
- */
- public void aggregate(double value) {
- sum += value;
- }
-
+/** Aggregator for summing up double values. */
+public class DoubleSumAggregator extends BasicAggregator<DoubleWritable> {
@Override
public void aggregate(DoubleWritable value) {
- sum += value.get();
- }
-
- /**
- * Set aggregated value using a primitive double.
- *
- * @param value Double value to set.
- */
- public void setAggregatedValue(double value) {
- sum = value;
+ getAggregatedValue().set(getAggregatedValue().get() + value.get());
}
@Override
- public void setAggregatedValue(DoubleWritable value) {
- sum = value.get();
+ public DoubleWritable createInitialValue() {
+ return new DoubleWritable(0);
}
-
- @Override
- public DoubleWritable getAggregatedValue() {
- return new DoubleWritable(sum);
- }
-
- @Override
- public DoubleWritable createAggregatedValue() {
- return new DoubleWritable();
- }
-
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatMaxAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatMaxAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatMaxAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatMaxAggregator.java Thu Aug 9 09:10:57 2012
@@ -20,57 +20,18 @@ package org.apache.giraph.aggregators;
import org.apache.hadoop.io.FloatWritable;
-import org.apache.giraph.graph.Aggregator;
-
/**
* Aggregator for getting max float value.
- *
*/
-public class FloatMaxAggregator implements Aggregator<FloatWritable> {
- /** Saved maximum value */
- private float max = Float.MIN_VALUE;
-
- /**
- * Aggregate with a primitive float.
- *
- * @param value Float value to aggregate.
- */
- public void aggregate(float value) {
- float val = value;
- if (val > max) {
- max = val;
- }
- }
-
+public class FloatMaxAggregator extends BasicAggregator<FloatWritable> {
@Override
public void aggregate(FloatWritable value) {
- float val = value.get();
- if (val > max) {
- max = val;
- }
- }
-
- /**
- * Set aggregated value using a primitive float.
- *
- * @param value Float value to set.
- */
- public void setAggregatedValue(float value) {
- max = value;
- }
-
- @Override
- public void setAggregatedValue(FloatWritable value) {
- max = value.get();
- }
-
- @Override
- public FloatWritable getAggregatedValue() {
- return new FloatWritable(max);
+ getAggregatedValue().set(
+ Math.max(getAggregatedValue().get(), value.get()));
}
@Override
- public FloatWritable createAggregatedValue() {
- return new FloatWritable();
+ public FloatWritable createInitialValue() {
+ return new FloatWritable(Float.MIN_VALUE);
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatMinAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatMinAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatMinAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatMinAggregator.java Thu Aug 9 09:10:57 2012
@@ -20,57 +20,18 @@ package org.apache.giraph.aggregators;
import org.apache.hadoop.io.FloatWritable;
-import org.apache.giraph.graph.Aggregator;
-
/**
* Aggregator for getting min float value.
*/
-public class FloatMinAggregator implements Aggregator<FloatWritable> {
- /** Internal aggregator */
- private float min = Float.MAX_VALUE;
-
- /**
- * Aggregate with a primitive float.
- *
- * @param value Float value to aggregate.
- */
- public void aggregate(float value) {
- float val = value;
- if (val < min) {
- min = val;
- }
- }
-
+public class FloatMinAggregator extends BasicAggregator<FloatWritable> {
@Override
public void aggregate(FloatWritable value) {
- float val = value.get();
- if (val < min) {
- min = val;
- }
- }
-
- /**
- * Set aggregated value using a primitive float.
- *
- * @param value Float value to set.
- */
- public void setAggregatedValue(float value) {
- min = value;
+ getAggregatedValue().set(
+ Math.min(getAggregatedValue().get(), value.get()));
}
@Override
- public void setAggregatedValue(FloatWritable value) {
- min = value.get();
+ public FloatWritable createInitialValue() {
+ return new FloatWritable(Float.MAX_VALUE);
}
-
- @Override
- public FloatWritable getAggregatedValue() {
- return new FloatWritable(min);
- }
-
- @Override
- public FloatWritable createAggregatedValue() {
- return new FloatWritable();
- }
-
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatOverwriteAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatOverwriteAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatOverwriteAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatOverwriteAggregator.java Thu Aug 9 09:10:57 2012
@@ -20,53 +20,20 @@ package org.apache.giraph.aggregators;
import org.apache.hadoop.io.FloatWritable;
-import org.apache.giraph.graph.Aggregator;
-
/**
* Aggregator that stores a value that is overwritten once another value is
* aggregated. This aggregator is useful for one-to-many communication from
* master.compute() or from a special vertex. In case multiple vertices write
* to this aggregator, its behavior is non-deterministic.
*/
-public class FloatOverwriteAggregator implements Aggregator<FloatWritable> {
- /** Internal result */
- private float result = 0.0f;
-
- /**
- * Aggregate with a primitive float.
- *
- * @param value Float value to aggregate.
- */
- public void aggregate(float value) {
- result = value;
- }
-
+public class FloatOverwriteAggregator extends BasicAggregator<FloatWritable> {
@Override
public void aggregate(FloatWritable value) {
- result = value.get();
- }
-
- /**
- * Set aggregated value using a primitive float.
- *
- * @param value Float value to set.
- */
- public void setAggregatedValue(float value) {
- result = value;
- }
-
- @Override
- public void setAggregatedValue(FloatWritable value) {
- result = value.get();
- }
-
- @Override
- public FloatWritable getAggregatedValue() {
- return new FloatWritable(result);
+ getAggregatedValue().set(value.get());
}
@Override
- public FloatWritable createAggregatedValue() {
- return new FloatWritable();
+ public FloatWritable createInitialValue() {
+ return new FloatWritable(0);
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatProductAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatProductAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatProductAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatProductAggregator.java Thu Aug 9 09:10:57 2012
@@ -20,51 +20,17 @@ package org.apache.giraph.aggregators;
import org.apache.hadoop.io.FloatWritable;
-import org.apache.giraph.graph.Aggregator;
-
/**
* Aggregator for calculating products of float values.
*/
-public class FloatProductAggregator implements Aggregator<FloatWritable> {
- /** Aggregated product */
- private float product = 1.0f;
-
- /**
- * Aggregate a primitive float.
- *
- * @param value Float value to aggregate.
- */
- public void aggregate(float value) {
- product *= value;
- }
-
+public class FloatProductAggregator extends BasicAggregator<FloatWritable> {
@Override
public void aggregate(FloatWritable value) {
- product *= value.get();
- }
-
- /**
- * Set aggregated value using a primitive float.
- *
- * @param value Float value to set.
- */
- public void setAggregatedValue(float value) {
- product = value;
+ getAggregatedValue().set(getAggregatedValue().get() * value.get());
}
@Override
- public void setAggregatedValue(FloatWritable value) {
- product = value.get();
+ public FloatWritable createInitialValue() {
+ return new FloatWritable(1);
}
-
- @Override
- public FloatWritable getAggregatedValue() {
- return new FloatWritable(product);
- }
-
- @Override
- public FloatWritable createAggregatedValue() {
- return new FloatWritable();
- }
-
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatSumAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatSumAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatSumAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatSumAggregator.java Thu Aug 9 09:10:57 2012
@@ -20,51 +20,17 @@ package org.apache.giraph.aggregators;
import org.apache.hadoop.io.FloatWritable;
-import org.apache.giraph.graph.Aggregator;
-
/**
* Aggregator for summing up float values.
*/
-public class FloatSumAggregator implements Aggregator<FloatWritable> {
- /** Aggregated sum */
- private float sum = 0;
-
- /**
- * Aggregate a primitive float.
- *
- * @param value Float value to aggregate.
- */
- public void aggregate(float value) {
- sum += value;
- }
-
+public class FloatSumAggregator extends BasicAggregator<FloatWritable> {
@Override
public void aggregate(FloatWritable value) {
- sum += value.get();
- }
-
- /**
- * Set aggregated value using a primitive float.
- *
- * @param value Float value to set.
- */
- public void setAggregatedValue(float value) {
- sum = value;
+ getAggregatedValue().set(getAggregatedValue().get() + value.get());
}
@Override
- public void setAggregatedValue(FloatWritable value) {
- sum = value.get();
+ public FloatWritable createInitialValue() {
+ return new FloatWritable(0);
}
-
- @Override
- public FloatWritable getAggregatedValue() {
- return new FloatWritable(sum);
- }
-
- @Override
- public FloatWritable createAggregatedValue() {
- return new FloatWritable();
- }
-
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntMaxAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntMaxAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntMaxAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntMaxAggregator.java Thu Aug 9 09:10:57 2012
@@ -20,57 +20,18 @@ package org.apache.giraph.aggregators;
import org.apache.hadoop.io.IntWritable;
-import org.apache.giraph.graph.Aggregator;
-
/**
* Aggregator for getting max integer value.
- *
*/
-public class IntMaxAggregator implements Aggregator<IntWritable> {
- /** Saved maximum value */
- private int max = Integer.MIN_VALUE;
-
- /**
- * Aggregate with a primitive integer.
- *
- * @param value Integer value to aggregate.
- */
- public void aggregate(int value) {
- int val = value;
- if (val > max) {
- max = val;
- }
- }
-
+public class IntMaxAggregator extends BasicAggregator<IntWritable> {
@Override
public void aggregate(IntWritable value) {
- int val = value.get();
- if (val > max) {
- max = val;
- }
- }
-
- /**
- * Set aggregated value using a primitive integer.
- *
- * @param value Integer value to set.
- */
- public void setAggregatedValue(int value) {
- max = value;
- }
-
- @Override
- public void setAggregatedValue(IntWritable value) {
- max = value.get();
- }
-
- @Override
- public IntWritable getAggregatedValue() {
- return new IntWritable(max);
+ getAggregatedValue().set(
+ Math.max(getAggregatedValue().get(), value.get()));
}
@Override
- public IntWritable createAggregatedValue() {
- return new IntWritable();
+ public IntWritable createInitialValue() {
+ return new IntWritable(Integer.MIN_VALUE);
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntMinAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntMinAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntMinAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntMinAggregator.java Thu Aug 9 09:10:57 2012
@@ -20,57 +20,18 @@ package org.apache.giraph.aggregators;
import org.apache.hadoop.io.IntWritable;
-import org.apache.giraph.graph.Aggregator;
-
/**
* Aggregator for getting min integer value.
*/
-public class IntMinAggregator implements Aggregator<IntWritable> {
- /** Internal aggregator */
- private int min = Integer.MAX_VALUE;
-
- /**
- * Aggregate with a primitive integer.
- *
- * @param value Integer value to aggregate.
- */
- public void aggregate(int value) {
- int val = value;
- if (val < min) {
- min = val;
- }
- }
-
+public class IntMinAggregator extends BasicAggregator<IntWritable> {
@Override
public void aggregate(IntWritable value) {
- int val = value.get();
- if (val < min) {
- min = val;
- }
- }
-
- /**
- * Set aggregated value using a primitive integer.
- *
- * @param value Integer value to set.
- */
- public void setAggregatedValue(int value) {
- min = value;
+ getAggregatedValue().set(
+ Math.min(getAggregatedValue().get(), value.get()));
}
@Override
- public void setAggregatedValue(IntWritable value) {
- min = value.get();
+ public IntWritable createInitialValue() {
+ return new IntWritable(Integer.MAX_VALUE);
}
-
- @Override
- public IntWritable getAggregatedValue() {
- return new IntWritable(min);
- }
-
- @Override
- public IntWritable createAggregatedValue() {
- return new IntWritable();
- }
-
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntOverwriteAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntOverwriteAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntOverwriteAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntOverwriteAggregator.java Thu Aug 9 09:10:57 2012
@@ -20,53 +20,20 @@ package org.apache.giraph.aggregators;
import org.apache.hadoop.io.IntWritable;
-import org.apache.giraph.graph.Aggregator;
-
/**
* Aggregator that stores a value that is overwritten once another value is
* aggregated. This aggregator is useful for one-to-many communication from
* master.compute() or from a special vertex. In case multiple vertices write
* to this aggregator, its behavior is non-deterministic.
*/
-public class IntOverwriteAggregator implements Aggregator<IntWritable> {
- /** Internal result */
- private int result = 0;
-
- /**
- * Aggregate with a primitive integer.
- *
- * @param value Integer value to aggregate.
- */
- public void aggregate(int value) {
- result = value;
- }
-
+public class IntOverwriteAggregator extends BasicAggregator<IntWritable> {
@Override
public void aggregate(IntWritable value) {
- result = value.get();
- }
-
- /**
- * Set aggregated value using a primitive integer.
- *
- * @param value Integer value to set.
- */
- public void setAggregatedValue(int value) {
- result = value;
- }
-
- @Override
- public void setAggregatedValue(IntWritable value) {
- result = value.get();
- }
-
- @Override
- public IntWritable getAggregatedValue() {
- return new IntWritable(result);
+ getAggregatedValue().set(value.get());
}
@Override
- public IntWritable createAggregatedValue() {
- return new IntWritable();
+ public IntWritable createInitialValue() {
+ return new IntWritable(0);
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntProductAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntProductAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntProductAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntProductAggregator.java Thu Aug 9 09:10:57 2012
@@ -20,50 +20,17 @@ package org.apache.giraph.aggregators;
import org.apache.hadoop.io.IntWritable;
-import org.apache.giraph.graph.Aggregator;
-
/**
* Aggregator for calculating products of long and integer values.
*/
-public class IntProductAggregator implements Aggregator<IntWritable> {
- /** Internal product */
- private int product = 1;
-
- /**
- * Aggregate a primitive integer.
- *
- * @param value Integer value to aggregate.
- */
- public void aggregate(int value) {
- product *= value;
- }
-
+public class IntProductAggregator extends BasicAggregator<IntWritable> {
@Override
public void aggregate(IntWritable value) {
- product *= value.get();
- }
-
- /**
- * Set aggregated value using a primitive integer.
- *
- * @param value Integer value to set.
- */
- public void setAggregatedValue(int value) {
- product = value;
- }
-
- @Override
- public void setAggregatedValue(IntWritable value) {
- product = value.get();
- }
-
- @Override
- public IntWritable getAggregatedValue() {
- return new IntWritable(product);
+ getAggregatedValue().set(getAggregatedValue().get() * value.get());
}
@Override
- public IntWritable createAggregatedValue() {
- return new IntWritable();
+ public IntWritable createInitialValue() {
+ return new IntWritable(1);
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntSumAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntSumAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntSumAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntSumAggregator.java Thu Aug 9 09:10:57 2012
@@ -20,50 +20,17 @@ package org.apache.giraph.aggregators;
import org.apache.hadoop.io.IntWritable;
-import org.apache.giraph.graph.Aggregator;
-
/**
* Aggregator for summing up integer values.
*/
-public class IntSumAggregator implements Aggregator<IntWritable> {
- /** Internal sum */
- private int sum = 0;
-
- /**
- * Aggregate a primitive integer.
- *
- * @param value Integer value to aggregate.
- */
- public void aggregate(int value) {
- sum += value;
- }
-
+public class IntSumAggregator extends BasicAggregator<IntWritable> {
@Override
public void aggregate(IntWritable value) {
- sum += value.get();
- }
-
- /**
- * Set aggregated value using a primitive integer.
- *
- * @param value Integer value to set.
- */
- public void setAggregatedValue(int value) {
- sum = value;
- }
-
- @Override
- public void setAggregatedValue(IntWritable value) {
- sum = value.get();
- }
-
- @Override
- public IntWritable getAggregatedValue() {
- return new IntWritable(sum);
+ getAggregatedValue().set(getAggregatedValue().get() + value.get());
}
@Override
- public IntWritable createAggregatedValue() {
- return new IntWritable();
+ public IntWritable createInitialValue() {
+ return new IntWritable(0);
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongMaxAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongMaxAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongMaxAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongMaxAggregator.java Thu Aug 9 09:10:57 2012
@@ -20,57 +20,18 @@ package org.apache.giraph.aggregators;
import org.apache.hadoop.io.LongWritable;
-import org.apache.giraph.graph.Aggregator;
-
/**
* Aggregator for getting max long value.
- *
*/
-public class LongMaxAggregator implements Aggregator<LongWritable> {
- /** Saved maximum value */
- private long max = Long.MIN_VALUE;
-
- /**
- * Aggregate with a primitive long.
- *
- * @param value Long value to aggregate.
- */
- public void aggregate(long value) {
- long val = value;
- if (val > max) {
- max = val;
- }
- }
-
+public class LongMaxAggregator extends BasicAggregator<LongWritable> {
@Override
public void aggregate(LongWritable value) {
- long val = value.get();
- if (val > max) {
- max = val;
- }
- }
-
- /**
- * Set aggregated value using a primitive long.
- *
- * @param value Long value to set.
- */
- public void setAggregatedValue(long value) {
- max = value;
- }
-
- @Override
- public void setAggregatedValue(LongWritable value) {
- max = value.get();
- }
-
- @Override
- public LongWritable getAggregatedValue() {
- return new LongWritable(max);
+ getAggregatedValue().set(
+ Math.max(getAggregatedValue().get(), value.get()));
}
@Override
- public LongWritable createAggregatedValue() {
- return new LongWritable();
+ public LongWritable createInitialValue() {
+ return new LongWritable(Long.MIN_VALUE);
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongMinAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongMinAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongMinAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongMinAggregator.java Thu Aug 9 09:10:57 2012
@@ -20,57 +20,18 @@ package org.apache.giraph.aggregators;
import org.apache.hadoop.io.LongWritable;
-import org.apache.giraph.graph.Aggregator;
-
/**
* Aggregator for getting min long value.
*/
-public class LongMinAggregator implements Aggregator<LongWritable> {
- /** Internal aggregator */
- private long min = Long.MAX_VALUE;
-
- /**
- * Aggregate with a primitive long.
- *
- * @param value Long value to aggregate.
- */
- public void aggregate(long value) {
- long val = value;
- if (val < min) {
- min = val;
- }
- }
-
+public class LongMinAggregator extends BasicAggregator<LongWritable> {
@Override
public void aggregate(LongWritable value) {
- long val = value.get();
- if (val < min) {
- min = val;
- }
- }
-
- /**
- * Set aggregated value using a primitive long.
- *
- * @param value Long value to set.
- */
- public void setAggregatedValue(long value) {
- min = value;
+ getAggregatedValue().set(
+ Math.min(getAggregatedValue().get(), value.get()));
}
@Override
- public void setAggregatedValue(LongWritable value) {
- min = value.get();
+ public LongWritable createInitialValue() {
+ return new LongWritable(Long.MAX_VALUE);
}
-
- @Override
- public LongWritable getAggregatedValue() {
- return new LongWritable(min);
- }
-
- @Override
- public LongWritable createAggregatedValue() {
- return new LongWritable();
- }
-
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongOverwriteAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongOverwriteAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongOverwriteAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongOverwriteAggregator.java Thu Aug 9 09:10:57 2012
@@ -20,53 +20,20 @@ package org.apache.giraph.aggregators;
import org.apache.hadoop.io.LongWritable;
-import org.apache.giraph.graph.Aggregator;
-
/**
* Aggregator that stores a value that is overwritten once another value is
* aggregated. This aggregator is useful for one-to-many communication from
* master.compute() or from a special vertex. In case multiple vertices write
* to this aggregator, its behavior is non-deterministic.
*/
-public class LongOverwriteAggregator implements Aggregator<LongWritable> {
- /** Internal result */
- private long result = 0L;
-
- /**
- * Aggregate with a primitive long.
- *
- * @param value Long value to aggregate.
- */
- public void aggregate(long value) {
- result = value;
- }
-
+public class LongOverwriteAggregator extends BasicAggregator<LongWritable> {
@Override
public void aggregate(LongWritable value) {
- result = value.get();
- }
-
- /**
- * Set aggregated value using a primitive long.
- *
- * @param value Long value to set.
- */
- public void setAggregatedValue(long value) {
- result = value;
- }
-
- @Override
- public void setAggregatedValue(LongWritable value) {
- result = value.get();
- }
-
- @Override
- public LongWritable getAggregatedValue() {
- return new LongWritable(result);
+ getAggregatedValue().set(value.get());
}
@Override
- public LongWritable createAggregatedValue() {
- return new LongWritable();
+ public LongWritable createInitialValue() {
+ return new LongWritable(0);
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongProductAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongProductAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongProductAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongProductAggregator.java Thu Aug 9 09:10:57 2012
@@ -20,50 +20,17 @@ package org.apache.giraph.aggregators;
import org.apache.hadoop.io.LongWritable;
-import org.apache.giraph.graph.Aggregator;
-
/**
* Aggregator for calculating products of long values.
*/
-public class LongProductAggregator implements Aggregator<LongWritable> {
- /** Internal product */
- private long product = 1L;
-
- /**
- * Aggregate a primitive long.
- *
- * @param value Long value to aggregate.
- */
- public void aggregate(long value) {
- product *= value;
- }
-
+public class LongProductAggregator extends BasicAggregator<LongWritable> {
@Override
public void aggregate(LongWritable value) {
- product *= value.get();
- }
-
- /**
- * Set aggregated value using a primitive long.
- *
- * @param value Long value to set.
- */
- public void setAggregatedValue(long value) {
- product = value;
- }
-
- @Override
- public void setAggregatedValue(LongWritable value) {
- product = value.get();
- }
-
- @Override
- public LongWritable getAggregatedValue() {
- return new LongWritable(product);
+ getAggregatedValue().set(getAggregatedValue().get() * value.get());
}
@Override
- public LongWritable createAggregatedValue() {
- return new LongWritable();
+ public LongWritable createInitialValue() {
+ return new LongWritable(1);
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongSumAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongSumAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongSumAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongSumAggregator.java Thu Aug 9 09:10:57 2012
@@ -20,50 +20,17 @@ package org.apache.giraph.aggregators;
import org.apache.hadoop.io.LongWritable;
-import org.apache.giraph.graph.Aggregator;
-
/**
* Aggregator for summing up long values.
*/
-public class LongSumAggregator implements Aggregator<LongWritable> {
- /** Internal sum */
- private long sum = 0;
-
- /**
- * Aggregate a primitive long.
- *
- * @param value Long value to aggregate.
- */
- public void aggregate(long value) {
- sum += value;
- }
-
+public class LongSumAggregator extends BasicAggregator<LongWritable> {
@Override
public void aggregate(LongWritable value) {
- sum += value.get();
- }
-
- /**
- * Set aggregated value using a primitive long.
- *
- * @param value Long value to set.
- */
- public void setAggregatedValue(long value) {
- sum = value;
- }
-
- @Override
- public void setAggregatedValue(LongWritable value) {
- sum = value.get();
- }
-
- @Override
- public LongWritable getAggregatedValue() {
- return new LongWritable(sum);
+ getAggregatedValue().set(getAggregatedValue().get() + value.get());
}
@Override
- public LongWritable createAggregatedValue() {
- return new LongWritable();
+ public LongWritable createInitialValue() {
+ return new LongWritable(0);
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java Thu Aug 9 09:10:57 2012
@@ -24,6 +24,7 @@ import org.apache.commons.cli.HelpFormat
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.graph.DefaultMasterCompute;
import org.apache.giraph.graph.EdgeListVertex;
import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.graph.WorkerContext;
@@ -115,26 +116,17 @@ public class RandomMessageBenchmark impl
DEFAULT_NUM_MESSAGES_PER_EDGE);
numSupersteps = getContext().getConfiguration().
getInt(SUPERSTEP_COUNT, -1);
- registerAggregator(AGG_SUPERSTEP_TOTAL_BYTES,
- LongSumAggregator.class);
- registerAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES,
- LongSumAggregator.class);
- registerAggregator(AGG_SUPERSTEP_TOTAL_MILLIS,
- LongSumAggregator.class);
- registerAggregator(WORKERS,
- LongSumAggregator.class);
}
@Override
public void preSuperstep() {
- LongSumAggregator superstepBytesAggregator =
- (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_BYTES);
- LongSumAggregator superstepMessagesAggregator =
- (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES);
- LongSumAggregator superstepMillisAggregator =
- (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MILLIS);
- LongSumAggregator workersAggregator =
- (LongSumAggregator) getAggregator(WORKERS);
+ long superstepBytes = this.<LongWritable>
+ getAggregatedValue(AGG_SUPERSTEP_TOTAL_BYTES).get();
+ long superstepMessages = this.<LongWritable>
+ getAggregatedValue(AGG_SUPERSTEP_TOTAL_MESSAGES).get();
+ long superstepMillis = this.<LongWritable>
+ getAggregatedValue(AGG_SUPERSTEP_TOTAL_MILLIS).get();
+ long workers = this.<LongWritable>getAggregatedValue(WORKERS).get();
// For timing and tracking the supersteps
// - superstep 0 starts the time, but cannot display any stats
@@ -143,41 +135,26 @@ public class RandomMessageBenchmark impl
if (getSuperstep() == 0) {
startSuperstepMillis = System.currentTimeMillis();
} else {
- totalBytes +=
- superstepBytesAggregator.getAggregatedValue().get();
- totalMessages +=
- superstepMessagesAggregator.getAggregatedValue().get();
- totalMillis +=
- superstepMillisAggregator.getAggregatedValue().get();
+ totalBytes += superstepBytes;
+ totalMessages += superstepMessages;
+ totalMillis += superstepMillis;
double superstepMegabytesPerSecond =
- superstepBytesAggregator.getAggregatedValue().get() *
- workersAggregator.getAggregatedValue().get() *
- 1000d / 1024d / 1024d /
- superstepMillisAggregator.getAggregatedValue().get();
+ superstepBytes * workers * 1000d / 1024d / 1024d / superstepMillis;
double megabytesPerSecond = totalBytes *
- workersAggregator.getAggregatedValue().get() *
- 1000d / 1024d / 1024d / totalMillis;
+ workers * 1000d / 1024d / 1024d / totalMillis;
double superstepMessagesPerSecond =
- superstepMessagesAggregator.getAggregatedValue().get() *
- workersAggregator.getAggregatedValue().get() * 1000d /
- superstepMillisAggregator.getAggregatedValue().get();
- double messagesPerSecond = totalMessages *
- workersAggregator.getAggregatedValue().get() * 1000d /
- totalMillis;
+ superstepMessages * workers * 1000d / superstepMillis;
+ double messagesPerSecond =
+ totalMessages * workers * 1000d / totalMillis;
if (LOG.isInfoEnabled()) {
- LOG.info("Outputing statistics for superstep " +
- getSuperstep());
- LOG.info(AGG_SUPERSTEP_TOTAL_BYTES + " : " +
- superstepBytesAggregator.getAggregatedValue());
+ LOG.info("Outputing statistics for superstep " + getSuperstep());
+ LOG.info(AGG_SUPERSTEP_TOTAL_BYTES + " : " + superstepBytes);
LOG.info(AGG_TOTAL_BYTES + " : " + totalBytes);
- LOG.info(AGG_SUPERSTEP_TOTAL_MESSAGES + " : " +
- superstepMessagesAggregator.getAggregatedValue());
+ LOG.info(AGG_SUPERSTEP_TOTAL_MESSAGES + " : " + superstepMessages);
LOG.info(AGG_TOTAL_MESSAGES + " : " + totalMessages);
- LOG.info(AGG_SUPERSTEP_TOTAL_MILLIS + " : " +
- superstepMillisAggregator.getAggregatedValue());
+ LOG.info(AGG_SUPERSTEP_TOTAL_MILLIS + " : " + superstepMillis);
LOG.info(AGG_TOTAL_MILLIS + " : " + totalMillis);
- LOG.info(WORKERS + " : " +
- workersAggregator.getAggregatedValue());
+ LOG.info(WORKERS + " : " + workers);
LOG.info("Superstep megabytes / second = " +
superstepMegabytesPerSecond);
LOG.info("Total megabytes / second = " +
@@ -187,41 +164,25 @@ public class RandomMessageBenchmark impl
LOG.info("Total messages / second = " +
messagesPerSecond);
LOG.info("Superstep megabytes / second / worker = " +
- superstepMegabytesPerSecond /
- workersAggregator.getAggregatedValue().get());
+ superstepMegabytesPerSecond / workers);
LOG.info("Total megabytes / second / worker = " +
- megabytesPerSecond /
- workersAggregator.getAggregatedValue().get());
+ megabytesPerSecond / workers);
LOG.info("Superstep messages / second / worker = " +
- superstepMessagesPerSecond /
- workersAggregator.getAggregatedValue().get());
+ superstepMessagesPerSecond / workers);
LOG.info("Total messages / second / worker = " +
- messagesPerSecond /
- workersAggregator.getAggregatedValue().get());
+ messagesPerSecond / workers);
}
}
- superstepBytesAggregator.setAggregatedValue(
- new LongWritable(0L));
- superstepMessagesAggregator.setAggregatedValue(
- new LongWritable(0L));
- workersAggregator.setAggregatedValue(
- new LongWritable(1L));
- useAggregator(AGG_SUPERSTEP_TOTAL_BYTES);
- useAggregator(AGG_SUPERSTEP_TOTAL_MILLIS);
- useAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES);
- useAggregator(WORKERS);
+ aggregate(WORKERS, new LongWritable(1));
}
@Override
public void postSuperstep() {
- LongSumAggregator superstepMillisAggregator =
- (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MILLIS);
long endSuperstepMillis = System.currentTimeMillis();
long superstepMillis = endSuperstepMillis - startSuperstepMillis;
startSuperstepMillis = endSuperstepMillis;
- superstepMillisAggregator.setAggregatedValue(
- new LongWritable(superstepMillis));
+ aggregate(AGG_SUPERSTEP_TOTAL_MILLIS, new LongWritable(superstepMillis));
}
@Override
@@ -263,6 +224,26 @@ public class RandomMessageBenchmark impl
}
/**
+ * Master compute associated with {@link RandomMessageBenchmark}.
+ * It registers required aggregators.
+ */
+ public static class RandomMessageBenchmarkMasterCompute extends
+ DefaultMasterCompute {
+ @Override
+ public void initialize() throws InstantiationException,
+ IllegalAccessException {
+ registerAggregator(AGG_SUPERSTEP_TOTAL_BYTES,
+ LongSumAggregator.class);
+ registerAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES,
+ LongSumAggregator.class);
+ registerAggregator(AGG_SUPERSTEP_TOTAL_MILLIS,
+ LongSumAggregator.class);
+ registerAggregator(WORKERS,
+ LongSumAggregator.class);
+ }
+ }
+
+ /**
* Actual message computation (messaging in this case)
*/
public static class RandomMessageVertex extends EdgeListVertex<
@@ -271,10 +252,6 @@ public class RandomMessageBenchmark impl
public void compute(Iterable<BytesWritable> messages) {
RandomMessageBenchmarkWorkerContext workerContext =
(RandomMessageBenchmarkWorkerContext) getWorkerContext();
- LongSumAggregator superstepBytesAggregator =
- (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_BYTES);
- LongSumAggregator superstepMessagesAggregator =
- (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES);
if (getSuperstep() < workerContext.getNumSupersteps()) {
for (int i = 0; i < workerContext.getNumMessagePerEdge(); i++) {
workerContext.randomizeMessageBytes();
@@ -282,8 +259,9 @@ public class RandomMessageBenchmark impl
new BytesWritable(workerContext.getMessageBytes()));
long bytesSent = workerContext.getMessageBytes().length *
getNumEdges();
- superstepBytesAggregator.aggregate(bytesSent);
- superstepMessagesAggregator.aggregate(getNumEdges());
+ aggregate(AGG_SUPERSTEP_TOTAL_BYTES, new LongWritable(bytesSent));
+ aggregate(AGG_SUPERSTEP_TOTAL_MESSAGES,
+ new LongWritable(getNumEdges()));
}
} else {
voteToHalt();
@@ -377,6 +355,7 @@ public class RandomMessageBenchmark impl
job.setVertexClass(RandomMessageVertex.class);
job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
job.setWorkerContextClass(RandomMessageBenchmarkWorkerContext.class);
+ job.setMasterComputeClass(RandomMessageBenchmarkMasterCompute.class);
job.setWorkerConfiguration(workers, workers, 100.0f);
job.getConfiguration().setLong(
PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,
Modified: giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java Thu Aug 9 09:10:57 2012
@@ -20,7 +20,7 @@ package org.apache.giraph.bsp;
import java.io.IOException;
-import org.apache.giraph.graph.AggregatorUsage;
+import org.apache.giraph.graph.MasterAggregatorUsage;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.zookeeper.KeeperException;
@@ -35,9 +35,9 @@ import org.apache.zookeeper.KeeperExcept
* @param <M> Message data
*/
@SuppressWarnings("rawtypes")
-public interface CentralizedServiceMaster<
- I extends WritableComparable, V extends Writable, E extends Writable,
- M extends Writable> extends CentralizedService<I, V, E, M>, AggregatorUsage {
+public interface CentralizedServiceMaster<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable> extends
+ CentralizedService<I, V, E, M>, MasterAggregatorUsage {
/**
* Become the master.
* @return true if became the master, false if the application is done.
Modified: giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java Thu Aug 9 09:10:57 2012
@@ -24,10 +24,10 @@ import java.util.List;
import java.util.Map;
import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.graph.WorkerAggregatorUsage;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.giraph.graph.AggregatorUsage;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.GraphMapper;
import org.apache.giraph.graph.partition.Partition;
@@ -48,7 +48,7 @@ import org.apache.giraph.graph.WorkerCon
@SuppressWarnings("rawtypes")
public interface CentralizedServiceWorker<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
- extends CentralizedService<I, V, E, M>, AggregatorUsage {
+ extends CentralizedService<I, V, E, M>, WorkerAggregatorUsage {
/**
* Get the worker information
*
Added: giraph/trunk/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java?rev=1371108&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java Thu Aug 9 09:10:57 2012
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.graph.DefaultMasterCompute;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.io.IOException;
+
+/** Vertex which uses aggrergators. To be used for testing. */
+public class AggregatorsTestVertex extends
+ EdgeListVertex<LongWritable, DoubleWritable, FloatWritable,
+ DoubleWritable> {
+
+ /** Name of regular aggregator */
+ private static final String REGULAR_AGG = "regular";
+ /** Name of persistent aggregator */
+ private static final String PERSISTENT_AGG = "persistent";
+ /** Name of master overwriting aggregator */
+ private static final String MASTER_WRITE_AGG = "master";
+ /** Value which master compute will use */
+ private static final long MASTER_VALUE = 12345;
+
+ @Override
+ public void compute(Iterable<DoubleWritable> messages) throws IOException {
+ long superstep = getSuperstep();
+
+ LongWritable myValue = new LongWritable(1L << superstep);
+ aggregate(REGULAR_AGG, myValue);
+ aggregate(PERSISTENT_AGG, myValue);
+
+ long nv = getTotalNumVertices();
+ if (superstep > 0) {
+ assertEquals(nv * (1L << (superstep - 1)),
+ ((LongWritable) getAggregatedValue(REGULAR_AGG)).get());
+ } else {
+ assertEquals(0,
+ ((LongWritable) getAggregatedValue(REGULAR_AGG)).get());
+ }
+ assertEquals(nv * ((1L << superstep) - 1),
+ ((LongWritable) getAggregatedValue(PERSISTENT_AGG)).get());
+ assertEquals(MASTER_VALUE * (1L << superstep),
+ ((LongWritable) getAggregatedValue(MASTER_WRITE_AGG)).get());
+
+ if (getSuperstep() == 10) {
+ voteToHalt();
+ }
+ }
+
+ /** Master compute which uses aggregators. To be used for testing. */
+ public static class AggregatorsTestMasterCompute extends
+ DefaultMasterCompute {
+ @Override
+ public void compute() {
+ long superstep = getSuperstep();
+
+ LongWritable myValue =
+ new LongWritable(MASTER_VALUE * (1L << superstep));
+ setAggregatedValue(MASTER_WRITE_AGG, myValue);
+
+ long nv = getTotalNumVertices();
+ if (superstep > 0) {
+ assertEquals(nv * (1L << (superstep - 1)),
+ ((LongWritable) getAggregatedValue(REGULAR_AGG)).get());
+ } else {
+ assertEquals(0,
+ ((LongWritable) getAggregatedValue(REGULAR_AGG)).get());
+ }
+ assertEquals(nv * ((1L << superstep) - 1),
+ ((LongWritable) getAggregatedValue(PERSISTENT_AGG)).get());
+ }
+
+ @Override
+ public void initialize() throws InstantiationException,
+ IllegalAccessException {
+ registerAggregator(REGULAR_AGG, LongSumAggregator.class);
+ registerPersistentAggregator(PERSISTENT_AGG,
+ LongSumAggregator.class);
+ registerAggregator(MASTER_WRITE_AGG, LongSumAggregator.class);
+ }
+ }
+
+ /**
+ * Throws exception if values are not equal.
+ *
+ * @param expected Expected value
+ * @param actual Actual value
+ */
+ private static void assertEquals(long expected, long actual) {
+ if (expected != actual) {
+ throw new RuntimeException("expected: " + expected +
+ ", actual: " + actual);
+ }
+ }
+}
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java Thu Aug 9 09:10:57 2012
@@ -19,10 +19,8 @@
package org.apache.giraph.examples;
import java.io.IOException;
-import java.util.Map;
import java.util.Map.Entry;
-import org.apache.giraph.graph.Aggregator;
import org.apache.giraph.graph.AggregatorWriter;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -57,11 +55,11 @@ public class SimpleAggregatorWriter impl
}
@Override
- public void writeAggregator(Map<String, Aggregator<Writable>> map,
+ public void writeAggregator(
+ Iterable<Entry<String, Writable>> aggregatorMap,
long superstep) throws IOException {
-
- for (Entry<String, Aggregator<Writable>> aggregator: map.entrySet()) {
- aggregator.getValue().getAggregatedValue().write(output);
+ for (Entry<String, Writable> entry : aggregatorMap) {
+ entry.getValue().write(output);
}
output.flush();
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java Thu Aug 9 09:10:57 2012
@@ -24,6 +24,7 @@ import org.apache.commons.cli.HelpFormat
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.graph.DefaultMasterCompute;
import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.EdgeListVertex;
import org.apache.giraph.graph.GiraphJob;
@@ -67,9 +68,6 @@ public class SimpleCheckpointVertex exte
SimpleCheckpointVertexWorkerContext workerContext =
(SimpleCheckpointVertexWorkerContext) getWorkerContext();
- LongSumAggregator sumAggregator = (LongSumAggregator)
- getAggregator(LongSumAggregator.class.getName());
-
boolean enableFault = workerContext.getEnableFault();
int supersteps = workerContext.getSupersteps();
@@ -86,10 +84,12 @@ public class SimpleCheckpointVertex exte
voteToHalt();
return;
}
- LOG.info("compute: " + sumAggregator);
- sumAggregator.aggregate(getId().get());
- LOG.info("compute: sum = " +
- sumAggregator.getAggregatedValue().get() +
+ long sumAgg = this.<LongWritable>getAggregatedValue(
+ LongSumAggregator.class.getName()).get();
+ LOG.info("compute: " + sumAgg);
+ aggregate(LongSumAggregator.class.getName(),
+ new LongWritable(getId().get()));
+ LOG.info("compute: sum = " + sumAgg +
" for vertex " + getId());
float msgValue = 0.0f;
for (FloatWritable message : messages) {
@@ -139,11 +139,6 @@ public class SimpleCheckpointVertex exte
@Override
public void preApplication()
throws InstantiationException, IllegalAccessException {
- registerAggregator(LongSumAggregator.class.getName(),
- LongSumAggregator.class);
- LongSumAggregator sumAggregator = (LongSumAggregator)
- getAggregator(LongSumAggregator.class.getName());
- sumAggregator.setAggregatedValue(0);
supersteps = getContext().getConfiguration()
.getInt(SUPERSTEP_COUNT, supersteps);
enableFault = getContext().getConfiguration()
@@ -152,15 +147,13 @@ public class SimpleCheckpointVertex exte
@Override
public void postApplication() {
- LongSumAggregator sumAggregator = (LongSumAggregator)
- getAggregator(LongSumAggregator.class.getName());
- FINAL_SUM = sumAggregator.getAggregatedValue().get();
+ FINAL_SUM = this.<LongWritable>getAggregatedValue(
+ LongSumAggregator.class.getName()).get();
LOG.info("FINAL_SUM=" + FINAL_SUM);
}
@Override
public void preSuperstep() {
- useAggregator(LongSumAggregator.class.getName());
}
@Override
@@ -221,6 +214,7 @@ public class SimpleCheckpointVertex exte
bspJob.setVertexInputFormatClass(GeneratedVertexInputFormat.class);
bspJob.setVertexOutputFormatClass(SimpleTextVertexOutputFormat.class);
bspJob.setWorkerContextClass(SimpleCheckpointVertexWorkerContext.class);
+ bspJob.setMasterComputeClass(SimpleCheckpointVertexMasterCompute.class);
int minWorkers = Integer.parseInt(cmd.getOptionValue('w'));
int maxWorkers = Integer.parseInt(cmd.getOptionValue('w'));
bspJob.setWorkerConfiguration(minWorkers, maxWorkers, 100.0f);
@@ -243,6 +237,20 @@ public class SimpleCheckpointVertex exte
}
/**
+ * Master compute associated with {@link SimpleCheckpointVertex}.
+ * It registers required aggregators.
+ */
+ public static class SimpleCheckpointVertexMasterCompute extends
+ DefaultMasterCompute {
+ @Override
+ public void initialize() throws InstantiationException,
+ IllegalAccessException {
+ registerAggregator(LongSumAggregator.class.getName(),
+ LongSumAggregator.class);
+ }
+ }
+
+ /**
* Executable from the command line.
*
* @param args Command line args.
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java Thu Aug 9 09:10:57 2012
@@ -19,16 +19,12 @@
package org.apache.giraph.examples;
import org.apache.giraph.aggregators.DoubleOverwriteAggregator;
+import org.apache.giraph.graph.DefaultMasterCompute;
import org.apache.giraph.graph.LongDoubleFloatDoubleVertex;
-import org.apache.giraph.graph.MasterCompute;
import org.apache.giraph.graph.WorkerContext;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.log4j.Logger;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
/**
* Demonstrates a computation with a centralized part implemented via a
* MasterCompute.
@@ -42,10 +38,8 @@ public class SimpleMasterComputeVertex e
@Override
public void compute(Iterable<DoubleWritable> messages) {
- DoubleOverwriteAggregator agg =
- (DoubleOverwriteAggregator) getAggregator(SMC_AGG);
double oldSum = getSuperstep() == 0 ? 0 : getValue().get();
- double newValue = agg.getAggregatedValue().get();
+ double newValue = this.<DoubleWritable>getAggregatedValue(SMC_AGG).get();
double newSum = oldSum + newValue;
setValue(new DoubleWritable(newSum));
SimpleMasterComputeWorkerContext workerContext =
@@ -65,12 +59,10 @@ public class SimpleMasterComputeVertex e
@Override
public void preApplication()
throws InstantiationException, IllegalAccessException {
- registerAggregator(SMC_AGG, DoubleOverwriteAggregator.class);
}
@Override
public void preSuperstep() {
- useAggregator(SMC_AGG);
}
@Override
@@ -94,20 +86,11 @@ public class SimpleMasterComputeVertex e
* MasterCompute used with {@link SimpleMasterComputeVertex}.
*/
public static class SimpleMasterCompute
- extends MasterCompute {
- @Override
- public void write(DataOutput out) throws IOException {
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- }
-
+ extends DefaultMasterCompute {
@Override
public void compute() {
- DoubleOverwriteAggregator agg =
- (DoubleOverwriteAggregator) getAggregator(SMC_AGG);
- agg.aggregate(((double) getSuperstep()) / 2 + 1);
+ setAggregatedValue(SMC_AGG,
+ new DoubleWritable(((double) getSuperstep()) / 2 + 1));
if (getSuperstep() == 10) {
haltComputation();
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java Thu Aug 9 09:10:57 2012
@@ -22,6 +22,7 @@ import org.apache.giraph.aggregators.Dou
import org.apache.giraph.aggregators.DoubleMinAggregator;
import org.apache.giraph.aggregators.LongSumAggregator;
import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.DefaultMasterCompute;
import org.apache.giraph.graph.LongDoubleFloatDoubleVertex;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexReader;
@@ -55,13 +56,15 @@ public class SimplePageRankVertex extend
/** Logger */
private static final Logger LOG =
Logger.getLogger(SimplePageRankVertex.class);
+ /** Sum aggregator name */
+ private static String SUM_AGG = "sum";
+ /** Min aggregator name */
+ private static String MIN_AGG = "min";
+ /** Max aggregator name */
+ private static String MAX_AGG = "max";
@Override
public void compute(Iterable<DoubleWritable> messages) {
- LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum");
- DoubleMinAggregator minAggreg = (DoubleMinAggregator) getAggregator("min");
- DoubleMaxAggregator maxAggreg = (DoubleMaxAggregator) getAggregator("max");
-
if (getSuperstep() >= 1) {
double sum = 0;
for (DoubleWritable message : messages) {
@@ -70,12 +73,12 @@ public class SimplePageRankVertex extend
DoubleWritable vertexValue =
new DoubleWritable((0.15f / getTotalNumVertices()) + 0.85f * sum);
setValue(vertexValue);
- maxAggreg.aggregate(vertexValue);
- minAggreg.aggregate(vertexValue);
- sumAggreg.aggregate(1L);
+ aggregate(MAX_AGG, vertexValue);
+ aggregate(MIN_AGG, vertexValue);
+ aggregate(SUM_AGG, new LongWritable(1));
LOG.info(getId() + ": PageRank=" + vertexValue +
- " max=" + maxAggreg.getAggregatedValue() +
- " min=" + minAggreg.getAggregatedValue());
+ " max=" + getAggregatedValue(MAX_AGG) +
+ " min=" + getAggregatedValue(MIN_AGG));
}
if (getSuperstep() < MAX_SUPERSTEPS) {
@@ -114,24 +117,13 @@ public class SimplePageRankVertex extend
@Override
public void preApplication()
throws InstantiationException, IllegalAccessException {
- registerAggregator("sum", LongSumAggregator.class);
- registerAggregator("min", DoubleMinAggregator.class);
- registerAggregator("max", DoubleMaxAggregator.class);
}
@Override
public void postApplication() {
-
- LongSumAggregator sumAggreg =
- (LongSumAggregator) getAggregator("sum");
- DoubleMinAggregator minAggreg =
- (DoubleMinAggregator) getAggregator("min");
- DoubleMaxAggregator maxAggreg =
- (DoubleMaxAggregator) getAggregator("max");
-
- FINAL_SUM = sumAggreg.getAggregatedValue().get();
- FINAL_MAX = maxAggreg.getAggregatedValue().get();
- FINAL_MIN = minAggreg.getAggregatedValue().get();
+ FINAL_SUM = this.<LongWritable>getAggregatedValue(SUM_AGG).get();
+ FINAL_MAX = this.<DoubleWritable>getAggregatedValue(MAX_AGG).get();
+ FINAL_MIN = this.<DoubleWritable>getAggregatedValue(MIN_AGG).get();
LOG.info("aggregatedNumVertices=" + FINAL_SUM);
LOG.info("aggregatedMaxPageRank=" + FINAL_MAX);
@@ -140,31 +132,21 @@ public class SimplePageRankVertex extend
@Override
public void preSuperstep() {
-
- LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum");
- DoubleMinAggregator minAggreg =
- (DoubleMinAggregator) getAggregator("min");
- DoubleMaxAggregator maxAggreg =
- (DoubleMaxAggregator) getAggregator("max");
-
if (getSuperstep() >= 3) {
LOG.info("aggregatedNumVertices=" +
- sumAggreg.getAggregatedValue() +
+ getAggregatedValue(SUM_AGG) +
" NumVertices=" + getTotalNumVertices());
- if (sumAggreg.getAggregatedValue().get() != getTotalNumVertices()) {
+ if (this.<LongWritable>getAggregatedValue(SUM_AGG).get() !=
+ getTotalNumVertices()) {
throw new RuntimeException("wrong value of SumAggreg: " +
- sumAggreg.getAggregatedValue() + ", should be: " +
+ getAggregatedValue(SUM_AGG) + ", should be: " +
getTotalNumVertices());
}
- DoubleWritable maxPagerank = maxAggreg.getAggregatedValue();
+ DoubleWritable maxPagerank = getAggregatedValue(MAX_AGG);
LOG.info("aggregatedMaxPageRank=" + maxPagerank.get());
- DoubleWritable minPagerank = minAggreg.getAggregatedValue();
+ DoubleWritable minPagerank = getAggregatedValue(MIN_AGG);
LOG.info("aggregatedMinPageRank=" + minPagerank.get());
}
- useAggregator("sum");
- useAggregator("min");
- useAggregator("max");
- sumAggreg.setAggregatedValue(0L);
}
@Override
@@ -172,6 +154,21 @@ public class SimplePageRankVertex extend
}
/**
+ * Master compute associated with {@link SimplePageRankVertex}.
+ * It registers required aggregators.
+ */
+ public static class SimplePageRankVertexMasterCompute extends
+ DefaultMasterCompute {
+ @Override
+ public void initialize() throws InstantiationException,
+ IllegalAccessException {
+ registerAggregator(SUM_AGG, LongSumAggregator.class);
+ registerPersistentAggregator(MIN_AGG, DoubleMinAggregator.class);
+ registerPersistentAggregator(MAX_AGG, DoubleMaxAggregator.class);
+ }
+ }
+
+ /**
* Simple VertexReader that supports {@link SimplePageRankVertex}
*/
public static class SimplePageRankVertexReader extends
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java Thu Aug 9 09:10:57 2012
@@ -19,6 +19,7 @@
package org.apache.giraph.examples;
import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.graph.DefaultMasterCompute;
import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.EdgeListVertex;
import org.apache.giraph.graph.WorkerContext;
@@ -116,25 +117,19 @@ public class VerifyMessage {
@Override
public void preApplication() throws InstantiationException,
IllegalAccessException {
- registerAggregator(LongSumAggregator.class.getName(),
- LongSumAggregator.class);
- LongSumAggregator sumAggregator = (LongSumAggregator)
- getAggregator(LongSumAggregator.class.getName());
- sumAggregator.setAggregatedValue(0);
SUPERSTEPS = getContext().getConfiguration().getInt(
SUPERSTEP_COUNT, SUPERSTEPS);
}
@Override
public void postApplication() {
- LongSumAggregator sumAggregator = (LongSumAggregator)
- getAggregator(LongSumAggregator.class.getName());
- FINAL_SUM = sumAggregator.getAggregatedValue().get();
+ LongWritable sumAggregatorValue =
+ getAggregatedValue(LongSumAggregator.class.getName());
+ FINAL_SUM = sumAggregatorValue.get();
}
@Override
public void preSuperstep() {
- useAggregator(LongSumAggregator.class.getName());
}
@Override
@@ -143,19 +138,18 @@ public class VerifyMessage {
@Override
public void compute(Iterable<VerifiableMessage> messages) {
- LongSumAggregator sumAggregator = (LongSumAggregator)
- getAggregator(LongSumAggregator.class.getName());
+ String sumAggregatorName = LongSumAggregator.class.getName();
if (getSuperstep() > SUPERSTEPS) {
voteToHalt();
return;
}
if (LOG.isDebugEnabled()) {
- LOG.debug("compute: " + sumAggregator);
+ LOG.debug("compute: " + getAggregatedValue(sumAggregatorName));
}
- sumAggregator.aggregate(getId().get());
+ aggregate(sumAggregatorName, new LongWritable(getId().get()));
if (LOG.isDebugEnabled()) {
LOG.debug("compute: sum = " +
- sumAggregator.getAggregatedValue().get() +
+ this.<LongWritable>getAggregatedValue(sumAggregatorName).get() +
" for vertex " + getId());
}
float msgValue = 0.0f;
@@ -206,4 +200,18 @@ public class VerifyMessage {
}
}
}
+
+ /**
+ * Master compute associated with {@link VerifyMessageVertex}.
+ * It registers required aggregators.
+ */
+ public static class VerifyMessageMasterCompute extends
+ DefaultMasterCompute {
+ @Override
+ public void initialize() throws InstantiationException,
+ IllegalAccessException {
+ registerAggregator(LongSumAggregator.class.getName(),
+ LongSumAggregator.class);
+ }
+ }
}