You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by rv...@apache.org on 2014/10/26 02:22:20 UTC
[37/47] Reduce/broadcast API
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java b/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java
new file mode 100644
index 0000000..9f821b4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.reducers;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Object responsible for performing reducing operation.
+ * Simple wrapper of ReduceOperation object and current value holding
+ * partially reduced result.
+ *
+ * @param <S> Single value type, objects passed on workers
+ * @param <R> Reduced value type
+ */
+public class Reducer<S, R extends Writable> implements Writable {
+ /** Reduce operations */
+ private ReduceOperation<S, R> reduceOp;
+ /** Current (partially) reduced value*/
+ private R currentValue;
+
+ /**
+ * Constructor
+ */
+ public Reducer() {
+ }
+ /**
+ * Constructor
+ * @param reduceOp Reduce operations
+ */
+ public Reducer(ReduceOperation<S, R> reduceOp) {
+ this.reduceOp = reduceOp;
+ this.currentValue = reduceOp.createInitialValue();
+ }
+ /**
+ * Constructor
+ * @param reduceOp Reduce operations
+ * @param currentValue current reduced value
+ */
+ public Reducer(ReduceOperation<S, R> reduceOp, R currentValue) {
+ this.reduceOp = reduceOp;
+ this.currentValue = currentValue;
+ }
+
+ /**
+ * Reduce given value into current reduced value.
+ * @param valueToReduce Single value to reduce
+ */
+ public void reduceSingle(S valueToReduce) {
+ reduceOp.reduceSingle(currentValue, valueToReduce);
+ }
+ /**
+ * Reduce given partially reduced value into current reduced value.
+ * @param valueToReduce Partial value to reduce
+ */
+ public void reducePartial(R valueToReduce) {
+ reduceOp.reducePartial(currentValue, valueToReduce);
+ }
+ /**
+ * Return new initial reduced value.
+ * @return New initial reduced value
+ */
+ public R createInitialValue() {
+ return reduceOp.createInitialValue();
+ }
+
+ public ReduceOperation<S, R> getReduceOp() {
+ return reduceOp;
+ }
+
+ public R getCurrentValue() {
+ return currentValue;
+ }
+
+ public void setCurrentValue(R currentValue) {
+ this.currentValue = currentValue;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeWritableObject(reduceOp, out);
+ currentValue.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ reduceOp = WritableUtils.readWritableObject(in, null);
+ currentValue = reduceOp.createInitialValue();
+ currentValue.readFields(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/reducers/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/package-info.java b/giraph-core/src/main/java/org/apache/giraph/reducers/package-info.java
new file mode 100644
index 0000000..eeefdeb
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/reducers/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of Giraph reducers.
+ */
+package org.apache.giraph.reducers;
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
index 5e046cc..3d654b4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
@@ -17,8 +17,12 @@
*/
package org.apache.giraph.utils;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
+import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.TYPES_HOLDER_CLASS;
+
+import java.io.IOException;
+import java.util.List;
+
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -29,6 +33,7 @@ import org.apache.giraph.Algorithm;
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConfigurationSettable;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.GiraphTypes;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -57,11 +62,8 @@ import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.zookeeper.ZooKeeper;
-import java.io.IOException;
-import java.util.List;
-
-import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.TYPES_HOLDER_CLASS;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
/**
* Translate command line args into Configuration Key-Value pairs.
@@ -147,6 +149,10 @@ public final class ConfigurationUtils {
ImmutableClassesGiraphConfiguration configuration) {
if (configuration != null) {
configuration.configureIfPossible(object);
+ } else if (object instanceof GiraphConfigurationSettable) {
+ throw new IllegalArgumentException(
+ "Trying to configure configurable object without value, " +
+ object.getClass());
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
index 3c5cbad..923d369 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
@@ -739,4 +739,38 @@ public class WritableUtils {
}
}
+ /**
+ * Create a copy of Writable object, by serializing and deserializing it.
+ *
+ * @param reusableOut Reusable output stream to serialize into
+ * @param reusableIn Reusable input stream to deserialize out of
+ * @param original Original value of which to make a copy
+ * @param <T> Type of the object
+ * @return Copy of the original value
+ */
+ public static <T extends Writable> T createCopy(
+ UnsafeByteArrayOutputStream reusableOut,
+ UnsafeReusableByteArrayInput reusableIn, T original) {
+ T copy = (T) createWritable(original.getClass(), null);
+
+ try {
+ reusableOut.reset();
+ original.write(reusableOut);
+ reusableIn.initialize(
+ reusableOut.getByteArray(), 0, reusableOut.getPos());
+ copy.readFields(reusableIn);
+
+ if (reusableIn.available() != 0) {
+ throw new RuntimeException("Serialization of " +
+ original.getClass() + " encountered issues, " +
+ reusableIn.available() + " bytes left to be read");
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "IOException occurred while trying to create a copy " +
+ original.getClass(), e);
+ }
+ return copy;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 120678f..f61e817 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -18,6 +18,27 @@
package org.apache.giraph.worker;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+
+import net.iharder.Base64;
+
import org.apache.giraph.bsp.ApplicationState;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -66,10 +87,10 @@ import org.apache.giraph.partition.PartitionStore;
import org.apache.giraph.partition.WorkerGraphPartitioner;
import org.apache.giraph.utils.CallableFactory;
import org.apache.giraph.utils.JMapHistoDumper;
-import org.apache.giraph.utils.ReactiveJMapHistoDumper;
import org.apache.giraph.utils.LoggerUtils;
import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.utils.ReactiveJMapHistoDumper;
import org.apache.giraph.utils.WritableUtils;
import org.apache.giraph.zk.BspEvent;
import org.apache.giraph.zk.PredicateLock;
@@ -96,26 +117,6 @@ import org.json.JSONObject;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-import net.iharder.Base64;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
/**
* ZooKeeper-based implementation of {@link CentralizedServiceWorker}.
@@ -162,10 +163,10 @@ public class BspServiceWorker<I extends WritableComparable,
private final WorkerContext workerContext;
/** Handler for aggregators */
- private final WorkerAggregatorHandler aggregatorHandler;
+ private final WorkerAggregatorHandler globalCommHandler;
/** Superstep output */
- private SuperstepOutput<I, V, E> superstepOutput;
+ private final SuperstepOutput<I, V, E> superstepOutput;
/** array of observers to call back to */
private final WorkerObserver[] observers;
@@ -212,10 +213,10 @@ public class BspServiceWorker<I extends WritableComparable,
workerAggregatorRequestProcessor =
new NettyWorkerAggregatorRequestProcessor(getContext(), conf, this);
- aggregatorHandler = new WorkerAggregatorHandler(this, conf, context);
+ globalCommHandler = new WorkerAggregatorHandler(this, conf, context);
workerContext = conf.createWorkerContext();
- workerContext.setWorkerAggregatorUsage(aggregatorHandler);
+ workerContext.setWorkerGlobalCommUsage(globalCommHandler);
superstepOutput = conf.createSuperstepOutput(context);
@@ -584,7 +585,7 @@ public class BspServiceWorker<I extends WritableComparable,
// Initialize aggregator at worker side during setup.
// Do this just before vertex and edge loading.
- aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
+ globalCommHandler.prepareSuperstep(workerAggregatorRequestProcessor);
VertexEdgeCount vertexEdgeCount;
long entriesLoaded;
@@ -895,7 +896,7 @@ public class BspServiceWorker<I extends WritableComparable,
postSuperstepCallbacks();
}
- aggregatorHandler.finishSuperstep(workerAggregatorRequestProcessor);
+ globalCommHandler.finishSuperstep(workerAggregatorRequestProcessor);
MessageStore<I, Writable> incomingMessageStore =
getServerData().getIncomingMessageStore();
@@ -1920,15 +1921,16 @@ else[HADOOP_NON_SECURE]*/
return workerServer.getServerData();
}
+
@Override
public WorkerAggregatorHandler getAggregatorHandler() {
- return aggregatorHandler;
+ return globalCommHandler;
}
@Override
public void prepareSuperstep() {
if (getSuperstep() != INPUT_SUPERSTEP) {
- aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
+ globalCommHandler.prepareSuperstep(workerAggregatorRequestProcessor);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index 35ad94b..89f74b3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -18,6 +18,8 @@
package org.apache.giraph.worker;
+import java.io.IOException;
+
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.graph.VertexEdgeCount;
@@ -37,8 +39,6 @@ import org.apache.log4j.Logger;
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Meter;
-import java.io.IOException;
-
/**
* Load as many edge input splits as possible.
* Every thread will has its own instance of WorkerClientRequestProcessor
@@ -62,7 +62,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
EdgeInputSplitsCallable.class);
/** Aggregator handler */
- private final WorkerThreadAggregatorUsage aggregatorUsage;
+ private final WorkerThreadGlobalCommUsage globalCommUsage;
/** Bsp service worker (only use thread-safe methods) */
private final BspServiceWorker<I, V, E> bspServiceWorker;
/** Edge input format */
@@ -105,7 +105,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
this.bspServiceWorker = bspServiceWorker;
inputSplitMaxEdges = configuration.getInputSplitMaxEdges();
// Initialize aggregator usage.
- this.aggregatorUsage = bspServiceWorker.getAggregatorHandler()
+ this.globalCommUsage = bspServiceWorker.getAggregatorHandler()
.newThreadAggregatorUsage();
edgeInputFilter = configuration.getEdgeInputFilter();
canEmbedInIds = bspServiceWorker
@@ -147,7 +147,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
edgeReader.initialize(inputSplit, context);
// Set aggregator usage to edge reader
- edgeReader.setWorkerAggregatorUse(aggregatorUsage);
+ edgeReader.setWorkerGlobalCommUsage(globalCommUsage);
long inputSplitEdgesLoaded = 0;
long inputSplitEdgesFiltered = 0;
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
index a2279a9..f6dca25 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
@@ -18,21 +18,21 @@
package org.apache.giraph.worker;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.io.GiraphInputFormat;
import org.apache.giraph.io.MappingInputFormat;
import org.apache.giraph.io.MappingReader;
-import org.apache.giraph.mapping.MappingStore;
import org.apache.giraph.mapping.MappingEntry;
+import org.apache.giraph.mapping.MappingStore;
import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
-
/**
* Load as many mapping input splits as possible.
* Every thread will has its own instance of WorkerClientRequestProcessor
@@ -89,11 +89,11 @@ public class MappingInputSplitsCallable<I extends WritableComparable,
mappingInputFormat.createMappingReader(inputSplit, context);
mappingReader.setConf(configuration);
- WorkerThreadAggregatorUsage aggregatorUsage = this.bspServiceWorker
+ WorkerThreadGlobalCommUsage globalCommUsage = this.bspServiceWorker
.getAggregatorHandler().newThreadAggregatorUsage();
mappingReader.initialize(inputSplit, context);
- mappingReader.setWorkerAggregatorUse(aggregatorUsage);
+ mappingReader.setWorkerGlobalCommUsage(globalCommUsage);
int entriesLoaded = 0;
MappingStore<I, B> mappingStore =
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index 4c85765..00a2781 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -18,6 +18,8 @@
package org.apache.giraph.worker;
+import java.io.IOException;
+
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.OutEdges;
@@ -42,8 +44,6 @@ import org.apache.log4j.Logger;
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Meter;
-import java.io.IOException;
-
/**
* Load as many vertex input splits as possible.
* Every thread will has its own instance of WorkerClientRequestProcessor
@@ -79,7 +79,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
* Whether the chosen {@link OutEdges} implementation allows for Edge
* reuse.
*/
- private boolean reuseEdgeObjects;
+ private final boolean reuseEdgeObjects;
/** Used to translate Edges during vertex input phase based on localData */
private final TranslateEdge<I, E> translateEdge;
@@ -152,13 +152,13 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
vertexInputFormat.createVertexReader(inputSplit, context);
vertexReader.setConf(configuration);
- WorkerThreadAggregatorUsage aggregatorUsage =
+ WorkerThreadGlobalCommUsage globalCommUsage =
this.bspServiceWorker
.getAggregatorHandler().newThreadAggregatorUsage();
vertexReader.initialize(inputSplit, context);
// Set aggregator usage to vertex reader
- vertexReader.setWorkerAggregatorUse(aggregatorUsage);
+ vertexReader.setWorkerGlobalCommUsage(globalCommUsage);
long inputSplitVerticesLoaded = 0;
long inputSplitVerticesFiltered = 0;
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java
new file mode 100644
index 0000000..5238a07
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.worker;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Class for delegating WorkerAggregatorUsage and
+ * WorkerGlobalCommUsage methods to corresponding interface.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ */
+public abstract class WorkerAggregatorDelegator<I extends WritableComparable,
+ V extends Writable, E extends Writable>
+ extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+ implements WorkerAggregatorUsage, WorkerGlobalCommUsage {
+
+ /** Worker aggregator usage */
+ private WorkerGlobalCommUsage workerGlobalCommUsage;
+
+ /**
+ * Set worker global communication usage
+ *
+ * @param workerGlobalCommUsage Worker global communication usage
+ */
+ public void setWorkerGlobalCommUsage(
+ WorkerGlobalCommUsage workerGlobalCommUsage) {
+ this.workerGlobalCommUsage = workerGlobalCommUsage;
+ }
+
+ @Override
+ public final void reduce(String name, Object value) {
+ workerGlobalCommUsage.reduce(name, value);
+ }
+
+ @Override
+ public final <B extends Writable> B getBroadcast(String name) {
+ return workerGlobalCommUsage.getBroadcast(name);
+ }
+
+ @Override
+ public final <A extends Writable> void aggregate(String name, A value) {
+ reduce(name, value);
+ }
+
+ @Override
+ public final <A extends Writable> A getAggregatedValue(String name) {
+ return this.<A>getBroadcast(name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
index 45ca665..05a13a7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
@@ -15,22 +15,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.giraph.worker;
import java.io.IOException;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
-import org.apache.giraph.aggregators.Aggregator;
import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.aggregators.AggregatedValueOutputStream;
+import org.apache.giraph.comm.GlobalCommType;
import org.apache.giraph.comm.aggregators.AggregatorUtils;
import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
+import org.apache.giraph.comm.aggregators.GlobalCommValueOutputStream;
import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.Factory;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.reducers.Reducer;
+import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
+import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
+import org.apache.giraph.utils.WritableUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;
@@ -38,35 +42,18 @@ import org.apache.log4j.Logger;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-/**
- * Handler for aggregators on worker. Provides the aggregated values and
- * performs aggregations from user vertex code (thread-safe). Also has
- * methods for all superstep coordination related to aggregators.
- *
- * At the beginning of any superstep any worker calls prepareSuperstep(),
- * which blocks until the final aggregates from the previous superstep have
- * been delivered to the worker.
- * Next, during the superstep worker can call aggregate() and
- * getAggregatedValue() (both methods are thread safe) the former
- * computes partial aggregates for this superstep from the worker,
- * the latter returns (read-only) final aggregates from the previous superstep.
- * Finally, at the end of the superstep, the worker calls finishSuperstep(),
- * which propagates non-owned partial aggregates to the owner workers,
- * and sends the final aggregate from the owner worker to the master.
- */
-public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
+/** Handler for reduce/broadcast on the workers */
+public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(WorkerAggregatorHandler.class);
- /** Map of values from previous superstep */
- private final Map<String, Writable> previousAggregatedValueMap =
+ /** Map of broadcasted values */
+ private final Map<String, Writable> broadcastedMap =
Maps.newHashMap();
- /** Map of aggregator factories for current superstep */
- private final Map<String, Factory<Aggregator<Writable>>>
- currentAggregatorFactoryMap = Maps.newHashMap();
- /** Map of aggregators for current superstep */
- private final Map<String, Aggregator<Writable>> currentAggregatorMap =
+ /** Map of reducers currently being reduced */
+ private final Map<String, Reducer<Object, Writable>> reducerMap =
Maps.newHashMap();
+
/** Service worker */
private final CentralizedServiceWorker<?, ?, ?> serviceWorker;
/** Progressable for reporting progress */
@@ -96,29 +83,48 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
}
@Override
- public <A extends Writable> void aggregate(String name, A value) {
- Aggregator<Writable> aggregator = currentAggregatorMap.get(name);
- if (aggregator != null) {
+ public <B extends Writable> B getBroadcast(String name) {
+ B value = (B) broadcastedMap.get(name);
+ if (value == null) {
+ LOG.warn("getBroadcast: " +
+ AggregatorUtils.getUnregisteredAggregatorMessage(name,
+ broadcastedMap.size() != 0, conf));
+ }
+ return value;
+ }
+
+ @Override
+ public void reduce(String name, Object value) {
+ Reducer<Object, Writable> reducer = reducerMap.get(name);
+ if (reducer != null) {
progressable.progress();
- synchronized (aggregator) {
- aggregator.aggregate(value);
+ synchronized (reducer) {
+ reducer.reduceSingle(value);
}
} else {
- throw new IllegalStateException("aggregate: " +
+ throw new IllegalStateException("reduce: " +
AggregatorUtils.getUnregisteredAggregatorMessage(name,
- currentAggregatorMap.size() != 0, conf));
+ reducerMap.size() != 0, conf));
}
}
- @Override
- public <A extends Writable> A getAggregatedValue(String name) {
- A value = (A) previousAggregatedValueMap.get(name);
- if (value == null) {
- LOG.warn("getAggregatedValue: " +
+ /**
+ * Combine partially reduced value into currently reduced value.
+ * @param name Name of the reducer
+ * @param valueToReduce Partial value to reduce
+ */
+ protected void reducePartial(String name, Writable valueToReduce) {
+ Reducer<Object, Writable> reducer = reducerMap.get(name);
+ if (reducer != null) {
+ progressable.progress();
+ synchronized (reducer) {
+ reducer.reducePartial(valueToReduce);
+ }
+ } else {
+ throw new IllegalStateException("reduce: " +
AggregatorUtils.getUnregisteredAggregatorMessage(name,
- previousAggregatedValueMap.size() != 0, conf));
+ reducerMap.size() != 0, conf));
}
- return value;
}
/**
@@ -128,53 +134,35 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
*/
public void prepareSuperstep(
WorkerAggregatorRequestProcessor requestProcessor) {
+ broadcastedMap.clear();
+ reducerMap.clear();
+
if (LOG.isDebugEnabled()) {
LOG.debug("prepareSuperstep: Start preparing aggregators");
}
- AllAggregatorServerData allAggregatorData =
+ AllAggregatorServerData allGlobalCommData =
serviceWorker.getServerData().getAllAggregatorData();
// Wait for my aggregators
Iterable<byte[]> dataToDistribute =
- allAggregatorData.getDataFromMasterWhenReady(
+ allGlobalCommData.getDataFromMasterWhenReady(
serviceWorker.getMasterInfo());
try {
// Distribute my aggregators
- requestProcessor.distributeAggregators(dataToDistribute);
+ requestProcessor.distributeReducedValues(dataToDistribute);
} catch (IOException e) {
throw new IllegalStateException("prepareSuperstep: " +
"IOException occurred while trying to distribute aggregators", e);
}
// Wait for all other aggregators and store them
- allAggregatorData.fillNextSuperstepMapsWhenReady(
- getOtherWorkerIdsSet(), previousAggregatedValueMap,
- currentAggregatorFactoryMap);
- fillAndInitAggregatorsMap(currentAggregatorMap);
- allAggregatorData.reset();
+ allGlobalCommData.fillNextSuperstepMapsWhenReady(
+ getOtherWorkerIdsSet(), broadcastedMap,
+ reducerMap);
if (LOG.isDebugEnabled()) {
LOG.debug("prepareSuperstep: Aggregators prepared");
}
}
/**
- * Fills aggregators map from currentAggregatorFactoryMap.
- * All aggregators in this map will be set to initial value.
- * @param aggregatorMap Map to fill.
- */
- private void fillAndInitAggregatorsMap(
- Map<String, Aggregator<Writable>> aggregatorMap) {
- for (Map.Entry<String, Factory<Aggregator<Writable>>> entry :
- currentAggregatorFactoryMap.entrySet()) {
- Aggregator<Writable> aggregator =
- aggregatorMap.get(entry.getKey());
- if (aggregator == null) {
- aggregatorMap.put(entry.getKey(), entry.getValue().create());
- } else {
- aggregator.reset();
- }
- }
- }
-
- /**
* Send aggregators to their owners and in the end to the master
*
* @param requestProcessor Request processor for aggregators
@@ -186,19 +174,19 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
"workers will send their aggregated values " +
"once they are done with superstep computation");
}
- OwnerAggregatorServerData ownerAggregatorData =
+ OwnerAggregatorServerData ownerGlobalCommData =
serviceWorker.getServerData().getOwnerAggregatorData();
// First send partial aggregated values to their owners and determine
// which aggregators belong to this worker
- for (Map.Entry<String, Aggregator<Writable>> entry :
- currentAggregatorMap.entrySet()) {
+ for (Map.Entry<String, Reducer<Object, Writable>> entry :
+ reducerMap.entrySet()) {
try {
- boolean sent = requestProcessor.sendAggregatedValue(entry.getKey(),
- entry.getValue().getAggregatedValue());
+ boolean sent = requestProcessor.sendReducedValue(entry.getKey(),
+ entry.getValue().getCurrentValue());
if (!sent) {
// If it's my aggregator, add it directly
- ownerAggregatorData.aggregate(entry.getKey(),
- entry.getValue().getAggregatedValue());
+ ownerGlobalCommData.reduce(entry.getKey(),
+ entry.getValue().getCurrentValue());
}
} catch (IOException e) {
throw new IllegalStateException("finishSuperstep: " +
@@ -216,20 +204,21 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
}
// Wait to receive partial aggregated values from all other workers
- Iterable<Map.Entry<String, Writable>> myAggregators =
- ownerAggregatorData.getMyAggregatorValuesWhenReady(
+ Iterable<Map.Entry<String, Writable>> myReducedValues =
+ ownerGlobalCommData.getMyReducedValuesWhenReady(
getOtherWorkerIdsSet());
// Send final aggregated values to master
- AggregatedValueOutputStream aggregatorOutput =
- new AggregatedValueOutputStream();
- for (Map.Entry<String, Writable> entry : myAggregators) {
+ GlobalCommValueOutputStream globalOutput =
+ new GlobalCommValueOutputStream(false);
+ for (Map.Entry<String, Writable> entry : myReducedValues) {
try {
- int currentSize = aggregatorOutput.addAggregator(entry.getKey(),
+ int currentSize = globalOutput.addValue(entry.getKey(),
+ GlobalCommType.REDUCED_VALUE,
entry.getValue());
if (currentSize > maxBytesPerAggregatorRequest) {
- requestProcessor.sendAggregatedValuesToMaster(
- aggregatorOutput.flush());
+ requestProcessor.sendReducedValuesToMaster(
+ globalOutput.flush());
}
progressable.progress();
} catch (IOException e) {
@@ -239,7 +228,7 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
}
}
try {
- requestProcessor.sendAggregatedValuesToMaster(aggregatorOutput.flush());
+ requestProcessor.sendReducedValuesToMaster(globalOutput.flush());
} catch (IOException e) {
throw new IllegalStateException("finishSuperstep: " +
"IOException occured while sending aggregators to master", e);
@@ -247,7 +236,7 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
// Wait for master to receive aggregated values before proceeding
serviceWorker.getWorkerClient().waitAllRequests();
- ownerAggregatorData.reset();
+ ownerGlobalCommData.reset();
if (LOG.isDebugEnabled()) {
LOG.debug("finishSuperstep: Aggregators finished");
}
@@ -259,9 +248,9 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
*
* @return New aggregator usage
*/
- public WorkerThreadAggregatorUsage newThreadAggregatorUsage() {
+ public WorkerThreadGlobalCommUsage newThreadAggregatorUsage() {
if (AggregatorUtils.useThreadLocalAggregators(conf)) {
- return new ThreadLocalWorkerAggregatorUsage();
+ return new ThreadLocalWorkerGlobalCommUsage();
} else {
return this;
}
@@ -290,56 +279,70 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
}
/**
- * Not thread-safe implementation of {@link WorkerThreadAggregatorUsage}.
- * We can use one instance of this object per thread to prevent
- * synchronizing on each aggregate() call. In the end of superstep,
- * values from each of these will be aggregated back to {@link
- * WorkerAggregatorHandler}
- */
- public class ThreadLocalWorkerAggregatorUsage
- implements WorkerThreadAggregatorUsage {
- /** Thread-local aggregator map */
- private final Map<String, Aggregator<Writable>> threadAggregatorMap;
+ * Not thread-safe implementation of {@link WorkerThreadGlobalCommUsage}.
+ * We can use one instance of this object per thread to prevent
+ * synchronizing on each aggregate() call. In the end of superstep,
+ * values from each of these will be aggregated back to {@link
+ * WorkerThreadGlobalCommUsage}
+ */
+ public class ThreadLocalWorkerGlobalCommUsage
+ implements WorkerThreadGlobalCommUsage {
+ /** Thread-local reducer map */
+ private final Map<String, Reducer<Object, Writable>> threadReducerMap;
/**
- * Constructor
- *
- * Creates new instances of all aggregators from
- * {@link WorkerAggregatorHandler}
- */
- public ThreadLocalWorkerAggregatorUsage() {
- threadAggregatorMap = Maps.newHashMapWithExpectedSize(
- WorkerAggregatorHandler.this.currentAggregatorMap.size());
- fillAndInitAggregatorsMap(threadAggregatorMap);
+ * Constructor
+ *
+ * Creates new instances of all reducers from
+ * {@link WorkerAggregatorHandler}
+ */
+ public ThreadLocalWorkerGlobalCommUsage() {
+ threadReducerMap = Maps.newHashMapWithExpectedSize(
+ WorkerAggregatorHandler.this.reducerMap.size());
+
+ UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream();
+ UnsafeReusableByteArrayInput in = new UnsafeReusableByteArrayInput();
+
+ for (Entry<String, Reducer<Object, Writable>> entry :
+ reducerMap.entrySet()) {
+ ReduceOperation<Object, Writable> globalReduceOp =
+ entry.getValue().getReduceOp();
+
+ ReduceOperation<Object, Writable> threadLocalCopy =
+ WritableUtils.createCopy(out, in, globalReduceOp);
+
+ threadReducerMap.put(entry.getKey(), new Reducer<>(threadLocalCopy));
+ }
}
@Override
- public <A extends Writable> void aggregate(String name, A value) {
- Aggregator<Writable> aggregator = threadAggregatorMap.get(name);
- if (aggregator != null) {
+ public void reduce(String name, Object value) {
+ Reducer<Object, Writable> reducer = threadReducerMap.get(name);
+ if (reducer != null) {
progressable.progress();
- aggregator.aggregate(value);
+ reducer.reduceSingle(value);
} else {
- throw new IllegalStateException("aggregate: " +
+ throw new IllegalStateException("reduce: " +
AggregatorUtils.getUnregisteredAggregatorMessage(name,
- threadAggregatorMap.size() != 0, conf));
+ threadReducerMap.size() != 0, conf));
}
}
@Override
- public <A extends Writable> A getAggregatedValue(String name) {
- return WorkerAggregatorHandler.this.<A>getAggregatedValue(name);
+ public <B extends Writable> B getBroadcast(String name) {
+ return WorkerAggregatorHandler.this.getBroadcast(name);
}
@Override
public void finishThreadComputation() {
// Aggregate the values this thread's vertices provided back to
// WorkerAggregatorHandler
- for (Map.Entry<String, Aggregator<Writable>> entry :
- threadAggregatorMap.entrySet()) {
- WorkerAggregatorHandler.this.aggregate(entry.getKey(),
- entry.getValue().getAggregatedValue());
+ for (Entry<String, Reducer<Object, Writable>> entry :
+ threadReducerMap.entrySet()) {
+ WorkerAggregatorHandler.this.reducePartial(entry.getKey(),
+ entry.getValue().getCurrentValue());
}
}
}
+
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
index 7a55d56..b977ea1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
@@ -18,30 +18,28 @@
package org.apache.giraph.worker;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.requests.SendWorkerToWorkerMessageRequest;
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
import org.apache.giraph.graph.GraphState;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
/**
* WorkerContext allows for the execution of user code
* on a per-worker basis. There's one WorkerContext per worker.
*/
@SuppressWarnings("rawtypes")
public abstract class WorkerContext
- extends DefaultImmutableClassesGiraphConfigurable
- implements WorkerAggregatorUsage, Writable {
+ extends WorkerAggregatorDelegator<WritableComparable, Writable, Writable>
+ implements Writable {
/** Global graph state */
private GraphState graphState;
- /** Worker aggregator usage */
- private WorkerAggregatorUsage workerAggregatorUsage;
/** Service worker */
private CentralizedServiceWorker serviceWorker;
@@ -71,16 +69,6 @@ public abstract class WorkerContext
}
/**
- * Set worker aggregator usage
- *
- * @param workerAggregatorUsage Worker aggregator usage
- */
- public void setWorkerAggregatorUsage(
- WorkerAggregatorUsage workerAggregatorUsage) {
- this.workerAggregatorUsage = workerAggregatorUsage;
- }
-
- /**
* Initialize the WorkerContext.
* This method is executed once on each Worker before the first
* superstep starts.
@@ -196,16 +184,6 @@ public abstract class WorkerContext
return graphState.getContext();
}
- @Override
- public <A extends Writable> void aggregate(String name, A value) {
- workerAggregatorUsage.aggregate(name, value);
- }
-
- @Override
- public <A extends Writable> A getAggregatedValue(String name) {
- return workerAggregatorUsage.<A>getAggregatedValue(name);
- }
-
/**
* Call this to log a line to command line of the job. Use in moderation -
* it's a synchronous call to Job client
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java
new file mode 100644
index 0000000..39566f5
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.worker;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Methods on worker can access broadcasted values and provide
+ * values to reduce through this interface
+ */
+public interface WorkerGlobalCommUsage {
+ /**
+ * Reduce given value.
+ * @param name Name of the reducer
+ * @param value Single value to reduce
+ */
+ void reduce(String name, Object value);
+ /**
+ * Get value broadcasted from master
+ * @param name Name of the broadcasted value
+ * @return Broadcasted value
+ * @param <B> Broadcast value type
+ */
+ <B extends Writable> B getBroadcast(String name);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadAggregatorUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadAggregatorUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadAggregatorUsage.java
deleted file mode 100644
index 194127e..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadAggregatorUsage.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.worker;
-
-/**
- * {@link WorkerAggregatorUsage} which can be used in each of the
- * computation threads.
- */
-public interface WorkerThreadAggregatorUsage extends WorkerAggregatorUsage {
- /**
- * Call this after thread's computation is finished,
- * i.e. when all vertices have provided their values to aggregators
- */
- void finishThreadComputation();
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadGlobalCommUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadGlobalCommUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadGlobalCommUsage.java
new file mode 100644
index 0000000..8edbdc7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadGlobalCommUsage.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+
+/**
+ * {@link WorkerAggregatorUsage} which can be used in each of the
+ * computation threads.
+ */
+public interface WorkerThreadGlobalCommUsage extends WorkerGlobalCommUsage {
+ /**
+ * Call this after thread's computation is finished,
+ * i.e. when all vertices have provided their values to aggregators
+ */
+ void finishThreadComputation();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
index 488e1ea..26459c0 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
@@ -18,6 +18,24 @@
package org.apache.giraph;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
import org.apache.giraph.aggregators.TextAggregatorWriter;
import org.apache.giraph.combiner.SimpleSumMessageCombiner;
import org.apache.giraph.conf.GiraphConfiguration;
@@ -62,24 +80,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
-import java.io.BufferedReader;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
/**
* Unit test for many simple BSP applications.
*/
@@ -456,8 +456,8 @@ public class
assertEquals(maxSuperstep + 2, maxValues.size());
assertEquals(maxSuperstep + 2, vertexCounts.size());
- assertEquals(maxPageRank, (double) maxValues.get(maxSuperstep), 0d);
- assertEquals(minPageRank, (double) minValues.get(maxSuperstep), 0d);
+ assertEquals(maxPageRank, maxValues.get(maxSuperstep), 0d);
+ assertEquals(minPageRank, minValues.get(maxSuperstep), 0d);
assertEquals(numVertices, (long) vertexCounts.get(maxSuperstep));
} finally {
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java b/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
index eb3f686..602ab32 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
@@ -18,34 +18,19 @@
package org.apache.giraph.aggregators;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
import org.apache.giraph.BspCase;
import org.apache.giraph.comm.aggregators.AggregatorUtils;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.examples.AggregatorsTestComputation;
import org.apache.giraph.examples.SimpleCheckpoint;
import org.apache.giraph.job.GiraphJob;
-import org.apache.giraph.master.MasterAggregatorHandler;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.Progressable;
import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
/** Tests if aggregators are handled on a proper way */
public class TestAggregatorsHandling extends BspCase {
@@ -54,21 +39,6 @@ public class TestAggregatorsHandling extends BspCase {
super(TestAggregatorsHandling.class.getName());
}
- private Map<String, AggregatorWrapper<Writable>> getAggregatorMap
- (MasterAggregatorHandler aggregatorHandler) {
- try {
- Field aggregtorMapField = aggregatorHandler.getClass().getDeclaredField
- ("aggregatorMap");
- aggregtorMapField.setAccessible(true);
- return (Map<String, AggregatorWrapper<Writable>>)
- aggregtorMapField.get(aggregatorHandler);
- } catch (IllegalAccessException e) {
- throw new IllegalStateException(e);
- } catch (NoSuchFieldException e) {
- throw new IllegalStateException(e);
- }
- }
-
/** Tests if aggregators are handled on a proper way during supersteps */
@Test
public void testAggregatorsHandling() throws IOException,
@@ -88,64 +58,6 @@ public class TestAggregatorsHandling extends BspCase {
assertTrue(job.run(true));
}
- /** Test if aggregators serialization captures everything */
- @Test
- public void testMasterAggregatorsSerialization() throws
- IllegalAccessException, InstantiationException, IOException {
- ImmutableClassesGiraphConfiguration conf =
- Mockito.mock(ImmutableClassesGiraphConfiguration.class);
- Mockito.when(conf.getAggregatorWriterClass()).thenReturn(
- TextAggregatorWriter.class);
- Progressable progressable = Mockito.mock(Progressable.class);
- MasterAggregatorHandler handler =
- new MasterAggregatorHandler(conf, progressable);
-
- String regularAggName = "regular";
- LongWritable regularValue = new LongWritable(5);
- handler.registerAggregator(regularAggName, LongSumAggregator.class);
- handler.setAggregatedValue(regularAggName, regularValue);
-
- String persistentAggName = "persistent";
- DoubleWritable persistentValue = new DoubleWritable(10.5);
- handler.registerPersistentAggregator(persistentAggName,
- DoubleOverwriteAggregator.class);
- handler.setAggregatedValue(persistentAggName, persistentValue);
-
- for (AggregatorWrapper<Writable> aggregator :
- getAggregatorMap(handler).values()) {
- aggregator.setPreviousAggregatedValue(
- aggregator.getCurrentAggregatedValue());
- }
-
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- handler.write(new DataOutputStream(out));
-
- MasterAggregatorHandler restartedHandler =
- new MasterAggregatorHandler(conf, progressable);
- restartedHandler.readFields(
- new DataInputStream(new ByteArrayInputStream(out.toByteArray())));
-
- assertEquals(2, getAggregatorMap(restartedHandler).size());
-
- AggregatorWrapper<Writable> regularAgg =
- getAggregatorMap(restartedHandler).get(regularAggName);
- assertTrue(regularAgg.getAggregatorFactory().create().getClass().equals(
- LongSumAggregator.class));
- assertEquals(regularValue, regularAgg.getPreviousAggregatedValue());
- assertEquals(regularValue,
- restartedHandler.<LongWritable>getAggregatedValue(regularAggName));
- assertFalse(regularAgg.isPersistent());
-
- AggregatorWrapper<Writable> persistentAgg =
- getAggregatorMap(restartedHandler).get(persistentAggName);
- assertTrue(persistentAgg.getAggregatorFactory().create().getClass().equals
- (DoubleOverwriteAggregator.class));
- assertEquals(persistentValue, persistentAgg.getPreviousAggregatedValue());
- assertEquals(persistentValue,
- restartedHandler.<LongWritable>getAggregatedValue(persistentAggName));
- assertTrue(persistentAgg.isPersistent());
- }
-
/**
* Test if aggregators are are handled properly when restarting from a
* checkpoint