You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2014/10/10 00:10:22 UTC
[2/3] Reduce/broadcast API
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
new file mode 100644
index 0000000..7171f04
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import java.io.IOException;
+
+import org.apache.giraph.master.MasterAggregatorHandler;
+
+/**
+ * Request to send final aggregated values from worker which owns
+ * aggregators to the master
+ */
+public class SendReducedToMasterRequest extends ByteArrayRequest
+ implements MasterRequest {
+
+ /**
+ * Constructor
+ *
+ * @param data Serialized aggregator data
+ */
+ public SendReducedToMasterRequest(byte[] data) {
+ super(data);
+ }
+
+ /**
+ * Constructor used for reflection only
+ */
+ public SendReducedToMasterRequest() {
+ }
+
+ @Override
+ public void doRequest(MasterAggregatorHandler aggregatorHandler) {
+ try {
+ aggregatorHandler.acceptReducedValues(getDataInput());
+ } catch (IOException e) {
+ throw new IllegalStateException("doRequest: " +
+ "IOException occurred while processing request", e);
+ }
+ }
+
+ @Override
+ public RequestType getType() {
+ return RequestType.SEND_AGGREGATORS_TO_MASTER_REQUEST;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
index 00a0c26..2f76e6e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
@@ -18,15 +18,15 @@
package org.apache.giraph.comm.requests;
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.giraph.comm.GlobalCommType;
import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.comm.aggregators.AggregatorUtils;
import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
-import java.io.DataInput;
-import java.io.IOException;
-
/**
* Request to send partial aggregated values for current superstep (values
* which were computed by one worker's vertices)
@@ -56,20 +56,23 @@ public class SendWorkerAggregatorsRequest extends
OwnerAggregatorServerData aggregatorData =
serverData.getOwnerAggregatorData();
try {
- int numAggregators = input.readInt();
- for (int i = 0; i < numAggregators; i++) {
- String aggregatorName = input.readUTF();
- if (aggregatorName.equals(
- AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
- LongWritable count = new LongWritable(0);
- count.readFields(input);
- aggregatorData.receivedRequestCountFromWorker(count.get(),
+ int num = input.readInt();
+ for (int i = 0; i < num; i++) {
+ String name = input.readUTF();
+ GlobalCommType type = GlobalCommType.values()[input.readByte()];
+ if (type == GlobalCommType.SPECIAL_COUNT) {
+ LongWritable value = new LongWritable();
+ value.readFields(input);
+ aggregatorData.receivedRequestCountFromWorker(
+ value.get(),
getSenderTaskId());
+ } else if (type == GlobalCommType.REDUCED_VALUE) {
+ Writable value = aggregatorData.createInitialValue(name);
+ value.readFields(input);
+ aggregatorData.reduce(name, value);
} else {
- Writable aggregatedValue =
- aggregatorData.createAggregatorInitialValue(aggregatorName);
- aggregatedValue.readFields(input);
- aggregatorData.aggregate(aggregatorName, aggregatedValue);
+ throw new IllegalStateException(
+ "SendWorkerAggregatorsRequest received " + type);
}
}
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java b/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java
index e7c3084..1ea6603 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java
@@ -18,20 +18,20 @@
package org.apache.giraph.graph;
+import java.io.IOException;
+import java.util.Iterator;
+
import org.apache.giraph.comm.WorkerClientRequestProcessor;
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.OutEdges;
-import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.giraph.worker.WorkerAggregatorDelegator;
import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.worker.WorkerGlobalCommUsage;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
-import java.io.IOException;
-import java.util.Iterator;
-
/**
* See {@link Computation} for explanation of the interface.
*
@@ -52,7 +52,7 @@ import java.util.Iterator;
public abstract class AbstractComputation<I extends WritableComparable,
V extends Writable, E extends Writable, M1 extends Writable,
M2 extends Writable>
- extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+ extends WorkerAggregatorDelegator<I, V, E>
implements Computation<I, V, E, M1, M2> {
/** Logger */
private static final Logger LOG = Logger.getLogger(AbstractComputation.class);
@@ -63,8 +63,6 @@ public abstract class AbstractComputation<I extends WritableComparable,
private WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor;
/** Graph-wide BSP Mapper for this Computation */
private GraphTaskManager<I, V, E> graphTaskManager;
- /** Worker aggregator usage */
- private WorkerAggregatorUsage workerAggregatorUsage;
/** Worker context */
private WorkerContext workerContext;
@@ -76,6 +74,7 @@ public abstract class AbstractComputation<I extends WritableComparable,
* superstep. Each message is only guaranteed to have
* a life expectancy as long as next() is not called.
*/
+ @Override
public abstract void compute(Vertex<I, V, E> vertex,
Iterable<M1> messages) throws IOException;
@@ -103,7 +102,7 @@ public abstract class AbstractComputation<I extends WritableComparable,
* @param graphState Graph state
* @param workerClientRequestProcessor Processor for handling requests
* @param graphTaskManager Graph-wide BSP Mapper for this Vertex
- * @param workerAggregatorUsage Worker aggregator usage
+ * @param workerGlobalCommUsage Worker global communication usage
* @param workerContext Worker context
*/
@Override
@@ -111,12 +110,12 @@ public abstract class AbstractComputation<I extends WritableComparable,
GraphState graphState,
WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor,
GraphTaskManager<I, V, E> graphTaskManager,
- WorkerAggregatorUsage workerAggregatorUsage,
+ WorkerGlobalCommUsage workerGlobalCommUsage,
WorkerContext workerContext) {
this.graphState = graphState;
this.workerClientRequestProcessor = workerClientRequestProcessor;
this.graphTaskManager = graphTaskManager;
- this.workerAggregatorUsage = workerAggregatorUsage;
+ this.setWorkerGlobalCommUsage(workerGlobalCommUsage);
this.workerContext = workerContext;
}
@@ -274,14 +273,4 @@ public abstract class AbstractComputation<I extends WritableComparable,
public <W extends WorkerContext> W getWorkerContext() {
return (W) workerContext;
}
-
- @Override
- public <A extends Writable> void aggregate(String name, A value) {
- workerAggregatorUsage.aggregate(name, value);
- }
-
- @Override
- public <A extends Writable> A getAggregatedValue(String name) {
- return workerAggregatorUsage.<A>getAggregatedValue(name);
- }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java b/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
index 7a7b40f..d310da9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
@@ -17,6 +17,9 @@
*/
package org.apache.giraph.graph;
+import java.io.IOException;
+import java.util.Iterator;
+
import org.apache.giraph.comm.WorkerClientRequestProcessor;
import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
import org.apache.giraph.conf.TypesHolder;
@@ -24,13 +27,11 @@ import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.worker.WorkerAggregatorUsage;
import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.worker.WorkerGlobalCommUsage;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
-import java.io.IOException;
-import java.util.Iterator;
-
/**
* Interface for an application for computation.
*
@@ -55,7 +56,7 @@ public interface Computation<I extends WritableComparable,
M2 extends Writable>
extends TypesHolder<I, V, E, M1, M2>,
ImmutableClassesGiraphConfigurable<I, V, E>,
- WorkerAggregatorUsage {
+ WorkerGlobalCommUsage, WorkerAggregatorUsage {
/**
* Must be defined by user to do computation on a single Vertex.
*
@@ -87,13 +88,13 @@ public interface Computation<I extends WritableComparable,
* @param graphState Graph state
* @param workerClientRequestProcessor Processor for handling requests
* @param graphTaskManager Graph-wide BSP Mapper for this Vertex
- * @param workerAggregatorUsage Worker aggregator usage
+ * @param workerGlobalCommUsage Worker global communication usage
* @param workerContext Worker context
*/
void initialize(GraphState graphState,
WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor,
GraphTaskManager<I, V, E> graphTaskManager,
- WorkerAggregatorUsage workerAggregatorUsage, WorkerContext workerContext);
+ WorkerGlobalCommUsage workerGlobalCommUsage, WorkerContext workerContext);
/**
* Retrieves the current superstep.
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index d9c4302..33f2255 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -36,7 +36,7 @@ import org.apache.giraph.utils.TimedLogger;
import org.apache.giraph.utils.Trimmable;
import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.worker.WorkerProgress;
-import org.apache.giraph.worker.WorkerThreadAggregatorUsage;
+import org.apache.giraph.worker.WorkerThreadGlobalCommUsage;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
@@ -135,7 +135,7 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor =
new NettyWorkerClientRequestProcessor<I, V, E>(
context, configuration, serviceWorker);
- WorkerThreadAggregatorUsage aggregatorUsage =
+ WorkerThreadGlobalCommUsage aggregatorUsage =
serviceWorker.getAggregatorHandler().newThreadAggregatorUsage();
WorkerContext workerContext = serviceWorker.getWorkerContext();
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index ba5d2fa..eb9fad3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -18,6 +18,19 @@
package org.apache.giraph.graph;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -26,9 +39,7 @@ import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.job.JobProgressTracker;
-import org.apache.giraph.scripting.ScriptLoader;
import org.apache.giraph.master.BspServiceMaster;
-import org.apache.giraph.master.MasterAggregatorUsage;
import org.apache.giraph.master.MasterThread;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.GiraphMetricsRegistry;
@@ -40,6 +51,7 @@ import org.apache.giraph.partition.Partition;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.partition.PartitionStats;
import org.apache.giraph.partition.PartitionStore;
+import org.apache.giraph.scripting.ScriptLoader;
import org.apache.giraph.utils.CallableFactory;
import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.ProgressableUtils;
@@ -60,19 +72,6 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
-import java.io.IOException;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Enumeration;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
/**
* The Giraph-specific business logic for a single BSP
* compute node in whatever underlying type of cluster
@@ -149,7 +148,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
/** Timer for WorkerContext#preSuperstep() */
private GiraphTimer wcPreSuperstepTimer;
/** The Hadoop Mapper#Context for this job */
- private Mapper<?, ?, ?, ?>.Context context;
+ private final Mapper<?, ?, ?, ?>.Context context;
/** is this GraphTaskManager the master? */
private boolean isMaster;
@@ -497,15 +496,6 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
return graphFunctions;
}
- /**
- * Get master aggregator usage, a subset of the functionality
- *
- * @return Master aggregator usage interface
- */
- public final MasterAggregatorUsage getMasterAggregatorUsage() {
- return serviceMaster.getAggregatorHandler();
- }
-
public final WorkerContext getWorkerContext() {
return serviceWorker.getWorkerContext();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
index 1bc48e3..83a0369 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
@@ -19,9 +19,9 @@
package org.apache.giraph.io;
import java.io.IOException;
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+
import org.apache.giraph.edge.Edge;
-import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.giraph.worker.WorkerAggregatorDelegator;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -36,11 +36,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
*/
@SuppressWarnings("rawtypes")
public abstract class EdgeReader<I extends WritableComparable,
- E extends Writable> extends DefaultImmutableClassesGiraphConfigurable<
- I, Writable, E> implements WorkerAggregatorUsage {
-
- /** Aggregator usage for edge reader */
- private WorkerAggregatorUsage workerAggregatorUsage;
+ E extends Writable> extends WorkerAggregatorDelegator<
+ I, Writable, E> {
/**
* Use the input split and context to setup reading the edges.
@@ -56,21 +53,6 @@ public abstract class EdgeReader<I extends WritableComparable,
throws IOException, InterruptedException;
/**
- * Set aggregator usage. It provides the functionality
- * of aggregation operation in reading an edge.
- * It is invoked just after initialization.
- * E.g.,
- * edgeReader.initialize(inputSplit, context);
- * edgeReader.setAggregator(aggregatorUsage);
- * This method is only for use by the infrastructure.
- *
- * @param agg aggregator usage for edge reader
- */
- public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
- workerAggregatorUsage = agg;
- }
-
- /**
* Read the next edge.
*
* @return false iff there are no more edges
@@ -117,14 +99,4 @@ public abstract class EdgeReader<I extends WritableComparable,
* @throws InterruptedException
*/
public abstract float getProgress() throws IOException, InterruptedException;
-
- @Override
- public <A extends Writable> void aggregate(String name, A value) {
- workerAggregatorUsage.aggregate(name, value);
- }
-
- @Override
- public <A extends Writable> A getAggregatedValue(String name) {
- return workerAggregatorUsage.<A>getAggregatedValue(name);
- }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java b/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java
index b7ce97c..7c71585 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java
@@ -18,16 +18,15 @@
package org.apache.giraph.io;
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import java.io.IOException;
+
import org.apache.giraph.mapping.MappingEntry;
-import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.giraph.worker.WorkerAggregatorDelegator;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import java.io.IOException;
-
/**
* Will read the mapping from an input split.
*
@@ -38,12 +37,7 @@ import java.io.IOException;
*/
public abstract class MappingReader<I extends WritableComparable,
V extends Writable, E extends Writable, B extends Writable>
- extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
- implements WorkerAggregatorUsage {
-
- /** Aggregator usage for vertex reader */
- private WorkerAggregatorUsage workerAggregatorUsage;
-
+ extends WorkerAggregatorDelegator<I, V, E> {
/**
* Use the input split and context to setup reading the vertices.
* Guaranteed to be called prior to any other function.
@@ -57,22 +51,6 @@ public abstract class MappingReader<I extends WritableComparable,
TaskAttemptContext context)
throws IOException, InterruptedException;
-
- /**
- * Set aggregator usage. It provides the functionality
- * of aggregation operation in reading a vertex.
- * It is invoked just after initialization.
- * E.g.,
- * vertexReader.initialize(inputSplit, context);
- * vertexReader.setAggregator(aggregatorUsage);
- * This method is only for use by the infrastructure.
- *
- * @param agg aggregator usage for vertex reader
- */
- public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
- workerAggregatorUsage = agg;
- }
-
/**
*
* @return false iff there are no more vertices
@@ -111,14 +89,4 @@ public abstract class MappingReader<I extends WritableComparable,
* @throws InterruptedException
*/
public abstract float getProgress() throws IOException, InterruptedException;
-
- @Override
- public <A extends Writable> void aggregate(String name, A value) {
- workerAggregatorUsage.aggregate(name, value);
- }
-
- @Override
- public <A extends Writable> A getAggregatedValue(String name) {
- return workerAggregatorUsage.getAggregatedValue(name);
- }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
index 94a4083..64ec800 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
@@ -18,16 +18,15 @@
package org.apache.giraph.io;
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import java.io.IOException;
+
import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.giraph.worker.WorkerAggregatorDelegator;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import java.io.IOException;
-
/**
* Analogous to Hadoop's RecordReader for vertices. Will read the
* vertices from an input split.
@@ -39,11 +38,7 @@ import java.io.IOException;
@SuppressWarnings("rawtypes")
public abstract class VertexReader<I extends WritableComparable,
V extends Writable, E extends Writable> extends
- DefaultImmutableClassesGiraphConfigurable<I, V, E>
- implements WorkerAggregatorUsage {
- /** Aggregator usage for vertex reader */
- private WorkerAggregatorUsage workerAggregatorUsage;
-
+ WorkerAggregatorDelegator<I, V, E> {
/**
* Use the input split and context to setup reading the vertices.
* Guaranteed to be called prior to any other function.
@@ -58,21 +53,6 @@ public abstract class VertexReader<I extends WritableComparable,
throws IOException, InterruptedException;
/**
- * Set aggregator usage. It provides the functionality
- * of aggregation operation in reading a vertex.
- * It is invoked just after initialization.
- * E.g.,
- * vertexReader.initialize(inputSplit, context);
- * vertexReader.setAggregator(aggregatorUsage);
- * This method is only for use by the infrastructure.
- *
- * @param agg aggregator usage for vertex reader
- */
- public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
- workerAggregatorUsage = agg;
- }
-
- /**
*
* @return false iff there are no more vertices
* @throws IOException
@@ -108,14 +88,4 @@ public abstract class VertexReader<I extends WritableComparable,
* @throws InterruptedException
*/
public abstract float getProgress() throws IOException, InterruptedException;
-
- @Override
- public <A extends Writable> void aggregate(String name, A value) {
- workerAggregatorUsage.aggregate(name, value);
- }
-
- @Override
- public <A extends Writable> A getAggregatedValue(String name) {
- return workerAggregatorUsage.<A>getAggregatedValue(name);
- }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
index 9b5e8c6..05dd5bc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
@@ -18,18 +18,18 @@
package org.apache.giraph.io.internal;
+import java.io.IOException;
+
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.io.EdgeReader;
import org.apache.giraph.job.HadoopUtils;
-import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.giraph.worker.WorkerGlobalCommUsage;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import java.io.IOException;
-
/**
* For internal use only.
*
@@ -72,9 +72,10 @@ public class WrappedEdgeReader<I extends WritableComparable,
}
@Override
- public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
- // Set aggregator usage for edge reader
- baseEdgeReader.setWorkerAggregatorUse(agg);
+ public void setWorkerGlobalCommUsage(WorkerGlobalCommUsage usage) {
+ super.setWorkerGlobalCommUsage(usage);
+ // Set global communication usage for edge reader
+ baseEdgeReader.setWorkerGlobalCommUsage(usage);
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java
index 7d1c4c9..659776b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java
@@ -18,18 +18,18 @@
package org.apache.giraph.io.internal;
+import java.io.IOException;
+
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.io.MappingReader;
import org.apache.giraph.job.HadoopUtils;
import org.apache.giraph.mapping.MappingEntry;
-import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.giraph.worker.WorkerGlobalCommUsage;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import java.io.IOException;
-
/**
* For internal use only.
*
@@ -74,11 +74,11 @@ public class WrappedMappingReader<I extends WritableComparable,
HadoopUtils.makeTaskAttemptContext(getConf(), context));
}
-
@Override
- public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
- // Set aggregator usage for vertex reader
- baseMappingReader.setWorkerAggregatorUse(agg);
+ public void setWorkerGlobalCommUsage(WorkerGlobalCommUsage usage) {
+ super.setWorkerGlobalCommUsage(usage);
+ // Set global communication usage for edge reader
+ baseMappingReader.setWorkerGlobalCommUsage(usage);
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
index 8e25602..8c23cba 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
@@ -18,18 +18,18 @@
package org.apache.giraph.io.internal;
+import java.io.IOException;
+
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.VertexReader;
import org.apache.giraph.job.HadoopUtils;
-import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.giraph.worker.WorkerGlobalCommUsage;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import java.io.IOException;
-
/**
* For internal use only.
*
@@ -73,9 +73,10 @@ public class WrappedVertexReader<I extends WritableComparable,
}
@Override
- public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
+ public void setWorkerGlobalCommUsage(WorkerGlobalCommUsage usage) {
+ super.setWorkerGlobalCommUsage(usage);
// Set aggregator usage for vertex reader
- baseVertexReader.setWorkerAggregatorUse(agg);
+ baseVertexReader.setWorkerGlobalCommUsage(usage);
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java
new file mode 100644
index 0000000..1673f6d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.master;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.reducers.OnSameReduceOperation;
+import org.apache.giraph.utils.WritableFactory;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Translates aggregation operation to reduce operations.
+ *
+ * @param <A> Aggregation object type
+ */
+public class AggregatorReduceOperation<A extends Writable>
+ extends OnSameReduceOperation<A> {
+ /** Aggregator factory */
+ private WritableFactory<? extends Aggregator<A>> aggregatorFactory;
+ /** Aggregator */
+ private Aggregator<A> aggregator;
+
+ /** Constructor */
+ public AggregatorReduceOperation() {
+ }
+
+ /**
+ * Constructor
+ * @param aggregatorFactory Aggregator factory
+ */
+ public AggregatorReduceOperation(
+ WritableFactory<? extends Aggregator<A>> aggregatorFactory) {
+ this.aggregatorFactory = aggregatorFactory;
+ this.aggregator = aggregatorFactory.create();
+ this.aggregator.setAggregatedValue(null);
+ }
+
+ @Override
+ public A createInitialValue() {
+ return aggregator.createInitialValue();
+ }
+
+ /**
+ * Creates copy of this object
+ * @return copy
+ */
+ public AggregatorReduceOperation<A> createCopy() {
+ return new AggregatorReduceOperation<>(aggregatorFactory);
+ }
+
+ @Override
+ public synchronized void reduceSingle(A curValue, A valueToReduce) {
+ aggregator.setAggregatedValue(curValue);
+ aggregator.aggregate(valueToReduce);
+ if (curValue != aggregator.getAggregatedValue()) {
+ throw new IllegalStateException(
+ "Aggregator " + aggregator + " aggregates by creating new value");
+ }
+ aggregator.setAggregatedValue(null);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeWritableObject(aggregatorFactory, out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ aggregatorFactory = WritableUtils.readWritableObject(in, null);
+ aggregator = aggregatorFactory.create();
+ this.aggregator.setAggregatedValue(null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
new file mode 100644
index 0000000..7492fc7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.master;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map.Entry;
+
+import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.aggregators.ClassAggregatorFactory;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.utils.WritableFactory;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Class that translates aggregator handling on the master to
+ * reduce and broadcast operations supported by the MasterAggregatorHandler.
+ */
+public class AggregatorToGlobalCommTranslation
+ extends DefaultImmutableClassesGiraphConfigurable
+ implements MasterAggregatorUsage, Writable {
+ /** Class providing reduce and broadcast interface to use */
+ private final MasterGlobalCommUsage globalComm;
+ /** List of registered aggregators */
+ private final HashMap<String, AggregatorWrapper<Writable>>
+ registeredAggregators = new HashMap<>();
+
+ /**
+ * Constructor
+ * @param globalComm Global communication interface
+ */
+ public AggregatorToGlobalCommTranslation(MasterGlobalCommUsage globalComm) {
+ this.globalComm = globalComm;
+ }
+
+ @Override
+ public <A extends Writable> A getAggregatedValue(String name) {
+ return globalComm.getReduced(name);
+ }
+
+ @Override
+ public <A extends Writable> void setAggregatedValue(String name, A value) {
+ AggregatorWrapper<Writable> aggregator = registeredAggregators.get(name);
+ aggregator.setCurrentValue(value);
+ }
+
+ /**
+ * Called after master compute, to do aggregator->reduce/broadcast
+ * translation
+ */
+ public void postMasterCompute() {
+ // broadcast what master set, or if it didn't broadcast reduced value
+ // register reduce with the same value
+ for (Entry<String, AggregatorWrapper<Writable>> entry :
+ registeredAggregators.entrySet()) {
+ Writable value = entry.getValue().currentValue != null ?
+ entry.getValue().getCurrentValue() :
+ globalComm.getReduced(entry.getKey());
+ if (value == null) {
+ value = entry.getValue().getReduceOp().createInitialValue();
+ }
+
+ globalComm.broadcast(entry.getKey(), value);
+ // Always register clean instance of reduceOp, not to conflict with
+ // reduceOp from previous superstep.
+ AggregatorReduceOperation<Writable> cleanReduceOp =
+ entry.getValue().createReduceOp();
+ if (entry.getValue().isPersistent()) {
+ globalComm.registerReduce(
+ entry.getKey(), cleanReduceOp, value);
+ } else {
+ globalComm.registerReduce(
+ entry.getKey(), cleanReduceOp);
+ }
+ entry.getValue().setCurrentValue(null);
+ }
+ }
+
+ @Override
+ public <A extends Writable> boolean registerAggregator(String name,
+ Class<? extends Aggregator<A>> aggregatorClass) throws
+ InstantiationException, IllegalAccessException {
+ ClassAggregatorFactory<A> aggregatorFactory =
+ new ClassAggregatorFactory<A>(aggregatorClass);
+ return registerAggregator(name, aggregatorFactory, false) != null;
+ }
+
+ @Override
+ public <A extends Writable> boolean registerAggregator(String name,
+ WritableFactory<? extends Aggregator<A>> aggregator) throws
+ InstantiationException, IllegalAccessException {
+ return registerAggregator(name, aggregator, false) != null;
+ }
+
+ @Override
+ public <A extends Writable> boolean registerPersistentAggregator(String name,
+ Class<? extends Aggregator<A>> aggregatorClass) throws
+ InstantiationException, IllegalAccessException {
+ ClassAggregatorFactory<A> aggregatorFactory =
+ new ClassAggregatorFactory<A>(aggregatorClass);
+ return registerAggregator(name, aggregatorFactory, true) != null;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(registeredAggregators.size());
+ for (Entry<String, AggregatorWrapper<Writable>> entry :
+ registeredAggregators.entrySet()) {
+ out.writeUTF(entry.getKey());
+ entry.getValue().write(out);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ registeredAggregators.clear();
+ int numAggregators = in.readInt();
+ for (int i = 0; i < numAggregators; i++) {
+ String name = in.readUTF();
+ AggregatorWrapper<Writable> agg = new AggregatorWrapper<>();
+ agg.readFields(in);
+ registeredAggregators.put(name, agg);
+ }
+ }
+
+ /**
+ * Helper function for registering aggregators.
+ *
+ * @param name Name of the aggregator
+ * @param aggregatorFactory Aggregator factory
+ * @param persistent Whether aggregator is persistent or not
+ * @param <A> Aggregated value type
+ * @return Newly registered aggregator or aggregator which was previously
+ * created with selected name, if any
+ */
+ private <A extends Writable> AggregatorWrapper<A> registerAggregator
+ (String name, WritableFactory<? extends Aggregator<A>> aggregatorFactory,
+ boolean persistent) throws InstantiationException,
+ IllegalAccessException {
+ AggregatorWrapper<A> aggregatorWrapper =
+ (AggregatorWrapper<A>) registeredAggregators.get(name);
+ if (aggregatorWrapper == null) {
+ aggregatorWrapper =
+ new AggregatorWrapper<A>(aggregatorFactory, persistent);
+ registeredAggregators.put(
+ name, (AggregatorWrapper<Writable>) aggregatorWrapper);
+ }
+ return aggregatorWrapper;
+ }
+
+ /**
+ * Object holding all needed data related to single Aggregator
+ * @param <A> Aggregated value type
+ */
+ private static class AggregatorWrapper<A extends Writable>
+ implements Writable {
+ /** False iff aggregator should be reset at the end of each super step */
+ private boolean persistent;
+ /** Translation of aggregator to reduce operations */
+ private AggregatorReduceOperation<A> reduceOp;
+ /** Current value, set by master manually */
+ private A currentValue;
+
+ /** Constructor */
+ public AggregatorWrapper() {
+ }
+
+ /**
+ * Constructor
+ * @param aggregatorFactory Aggregator factory
+ * @param persistent Is persistent
+ */
+ public AggregatorWrapper(
+ WritableFactory<? extends Aggregator<A>> aggregatorFactory,
+ boolean persistent) {
+ this.persistent = persistent;
+ this.reduceOp = new AggregatorReduceOperation<>(aggregatorFactory);
+ }
+
+ public AggregatorReduceOperation<A> getReduceOp() {
+ return reduceOp;
+ }
+
+ /**
+ * Create a fresh instance of AggregatorReduceOperation
+ * @return fresh instance of AggregatorReduceOperation
+ */
+ public AggregatorReduceOperation<A> createReduceOp() {
+ return reduceOp.createCopy();
+ }
+
+ public A getCurrentValue() {
+ return currentValue;
+ }
+
+ public void setCurrentValue(A currentValue) {
+ this.currentValue = currentValue;
+ }
+
+ public boolean isPersistent() {
+ return persistent;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeBoolean(persistent);
+ reduceOp.write(out);
+
+ Preconditions.checkState(currentValue == null, "AggregatorWrapper " +
+ "shouldn't have value at the end of the superstep");
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ persistent = in.readBoolean();
+ reduceOp = new AggregatorReduceOperation<>();
+ reduceOp.readFields(in);
+ currentValue = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index efa5b87..ab1289d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -18,11 +18,39 @@
package org.apache.giraph.master;
+import static org.apache.giraph.conf.GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT;
+import static org.apache.giraph.conf.GiraphConstants.KEEP_ZOOKEEPER_DATA;
+import static org.apache.giraph.conf.GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT;
+import static org.apache.giraph.conf.GiraphConstants.USE_INPUT_SPLIT_LOCALITY;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import net.iharder.Base64;
+
import org.apache.commons.io.FilenameUtils;
import org.apache.giraph.bsp.ApplicationState;
import org.apache.giraph.bsp.BspInputFormat;
+import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.bsp.CheckpointStatus;
import org.apache.giraph.bsp.SuperstepState;
@@ -33,23 +61,17 @@ import org.apache.giraph.comm.netty.NettyMasterServer;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.counters.GiraphStats;
-import org.apache.giraph.graph.InputSplitPaths;
-import org.apache.giraph.graph.GlobalStats;
import org.apache.giraph.graph.AddressesAndPartitionsWritable;
+import org.apache.giraph.graph.GlobalStats;
import org.apache.giraph.graph.GraphFunctions;
-import org.apache.giraph.graph.InputSplitEvents;
-import org.apache.giraph.bsp.BspService;
import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.GraphTaskManager;
+import org.apache.giraph.graph.InputSplitEvents;
+import org.apache.giraph.graph.InputSplitPaths;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.GiraphInputFormat;
-import org.apache.giraph.graph.GraphTaskManager;
import org.apache.giraph.io.MappingInputFormat;
import org.apache.giraph.io.VertexInputFormat;
-import org.apache.giraph.partition.BasicPartitionOwner;
-import org.apache.giraph.partition.MasterGraphPartitioner;
-import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.partition.PartitionStats;
-import org.apache.giraph.partition.PartitionUtils;
import org.apache.giraph.metrics.AggregatedMetrics;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.GiraphTimer;
@@ -57,13 +79,18 @@ import org.apache.giraph.metrics.GiraphTimerContext;
import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
import org.apache.giraph.metrics.SuperstepMetricsRegistry;
import org.apache.giraph.metrics.WorkerSuperstepMetrics;
-import org.apache.giraph.utils.CheckpointingUtils;
-import org.apache.giraph.utils.JMapHistoDumper;
-import org.apache.giraph.utils.ReactiveJMapHistoDumper;
-import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.partition.BasicPartitionOwner;
+import org.apache.giraph.partition.MasterGraphPartitioner;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.partition.PartitionStats;
+import org.apache.giraph.partition.PartitionUtils;
import org.apache.giraph.time.SystemTime;
import org.apache.giraph.time.Time;
+import org.apache.giraph.utils.CheckpointingUtils;
+import org.apache.giraph.utils.JMapHistoDumper;
import org.apache.giraph.utils.LogStacktraceCallable;
+import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.utils.ReactiveJMapHistoDumper;
import org.apache.giraph.utils.WritableUtils;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.giraph.zk.BspEvent;
@@ -89,32 +116,6 @@ import org.json.JSONObject;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import net.iharder.Base64;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.giraph.conf.GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT;
-import static org.apache.giraph.conf.GiraphConstants.KEEP_ZOOKEEPER_DATA;
-import static org.apache.giraph.conf.GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT;
-import static org.apache.giraph.conf.GiraphConstants.USE_INPUT_SPLIT_LOCALITY;
/**
* ZooKeeper-based implementation of {@link CentralizedServiceMaster}.
@@ -167,8 +168,10 @@ public class BspServiceMaster<I extends WritableComparable,
/** All the partition stats from the last superstep */
private final List<PartitionStats> allPartitionStatsList =
new ArrayList<PartitionStats>();
- /** Handler for aggregators */
- private MasterAggregatorHandler aggregatorHandler;
+ /** Handler for global communication */
+ private MasterAggregatorHandler globalCommHandler;
+ /** Handler for aggregators to reduce/broadcast translation */
+ private AggregatorToGlobalCommTranslation aggregatorTranslation;
/** Master class */
private MasterCompute masterCompute;
/** IPC Client */
@@ -232,7 +235,7 @@ public class BspServiceMaster<I extends WritableComparable,
this.checkpointStatus = CheckpointStatus.NONE;
GiraphMetrics.get().addSuperstepResetObserver(this);
- GiraphStats.init((Mapper.Context) context);
+ GiraphStats.init(context);
}
@Override
@@ -738,8 +741,13 @@ public class BspServiceMaster<I extends WritableComparable,
}
@Override
- public MasterAggregatorHandler getAggregatorHandler() {
- return aggregatorHandler;
+ public MasterAggregatorHandler getGlobalCommHandler() {
+ return globalCommHandler;
+ }
+
+ @Override
+ public AggregatorToGlobalCommTranslation getAggregatorTranslationHandler() {
+ return aggregatorTranslation;
}
@Override
@@ -811,7 +819,8 @@ public class BspServiceMaster<I extends WritableComparable,
});
- aggregatorHandler.readFields(finalizedStream);
+ globalCommHandler.readFields(finalizedStream);
+ aggregatorTranslation.readFields(finalizedStream);
masterCompute.readFields(finalizedStream);
finalizedStream.close();
@@ -883,9 +892,12 @@ public class BspServiceMaster<I extends WritableComparable,
if (masterChildArr.get(0).equals(myBid)) {
GiraphStats.getInstance().getCurrentMasterTaskPartition().
setValue(getTaskPartition());
- aggregatorHandler = new MasterAggregatorHandler(getConfiguration(),
- getContext());
- aggregatorHandler.initialize(this);
+ globalCommHandler = new MasterAggregatorHandler(
+ getConfiguration(), getContext());
+ aggregatorTranslation = new AggregatorToGlobalCommTranslation(
+ globalCommHandler);
+
+ globalCommHandler.initialize(this);
masterCompute = getConfiguration().createMasterCompute();
masterCompute.setMasterService(this);
@@ -1097,7 +1109,8 @@ public class BspServiceMaster<I extends WritableComparable,
for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
finalizedOutputStream.writeInt(chosenWorkerInfo.getTaskId());
}
- aggregatorHandler.write(finalizedOutputStream);
+ globalCommHandler.write(finalizedOutputStream);
+ aggregatorTranslation.write(finalizedOutputStream);
masterCompute.write(finalizedOutputStream);
finalizedOutputStream.close();
lastCheckpointedSuperstep = superstep;
@@ -1502,7 +1515,8 @@ public class BspServiceMaster<I extends WritableComparable,
*/
private void initializeAggregatorInputSuperstep()
throws InterruptedException {
- aggregatorHandler.prepareSuperstep(masterClient);
+ globalCommHandler.prepareSuperstep();
+
prepareMasterCompute(getSuperstep());
try {
masterCompute.initialize();
@@ -1516,7 +1530,10 @@ public class BspServiceMaster<I extends WritableComparable,
throw new RuntimeException(
"initializeAggregatorInputSuperstep: Failed in access", e);
}
- aggregatorHandler.finishSuperstep(masterClient);
+ aggregatorTranslation.postMasterCompute();
+ globalCommHandler.finishSuperstep();
+
+ globalCommHandler.sendDataToOwners(masterClient);
}
/**
@@ -1579,18 +1596,18 @@ public class BspServiceMaster<I extends WritableComparable,
}
}
+ // We need to finalize aggregators from previous superstep
+ if (getSuperstep() >= 0) {
+ aggregatorTranslation.postMasterCompute();
+ globalCommHandler.finishSuperstep();
+ }
+
masterClient.openConnections();
GiraphStats.getInstance().
getCurrentWorkers().setValue(chosenWorkerInfoList.size());
assignPartitionOwners();
- // We need to finalize aggregators from previous superstep (send them to
- // worker owners) after new worker assignments
- if (getSuperstep() >= 0) {
- aggregatorHandler.finishSuperstep(masterClient);
- }
-
// Finalize the valid checkpoint file prefixes and possibly
// the aggregators.
if (checkpointStatus != CheckpointStatus.NONE) {
@@ -1616,6 +1633,11 @@ public class BspServiceMaster<I extends WritableComparable,
}
}
+ // We need to send aggregators to worker owners after new worker assignments
+ if (getSuperstep() >= 0) {
+ globalCommHandler.sendDataToOwners(masterClient);
+ }
+
if (getSuperstep() == INPUT_SUPERSTEP) {
// Initialize aggregators before coordinating
initializeAggregatorInputSuperstep();
@@ -1645,7 +1667,7 @@ public class BspServiceMaster<I extends WritableComparable,
// Collect aggregator values, then run the master.compute() and
// finally save the aggregator values
- aggregatorHandler.prepareSuperstep(masterClient);
+ globalCommHandler.prepareSuperstep();
SuperstepClasses superstepClasses =
prepareMasterCompute(getSuperstep() + 1);
doMasterCompute();
@@ -1710,7 +1732,7 @@ public class BspServiceMaster<I extends WritableComparable,
} else {
superstepState = SuperstepState.THIS_SUPERSTEP_DONE;
}
- aggregatorHandler.writeAggregators(getSuperstep(), superstepState);
+ globalCommHandler.writeAggregators(getSuperstep(), superstepState);
return superstepState;
}
@@ -1935,7 +1957,7 @@ public class BspServiceMaster<I extends WritableComparable,
failJob(new Exception("Checkpoint and halt requested. " +
"Killing this job."));
}
- aggregatorHandler.close();
+ globalCommHandler.close();
masterClient.closeConnections();
masterServer.close();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
index 2b0cdd6..5f7bd73 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
@@ -15,263 +15,224 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.giraph.master;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.AbstractMap;
import java.util.Map;
+import java.util.Map.Entry;
-import org.apache.giraph.aggregators.Aggregator;
-import org.apache.giraph.aggregators.AggregatorWrapper;
import org.apache.giraph.aggregators.AggregatorWriter;
-import org.apache.giraph.aggregators.ClassAggregatorFactory;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.SuperstepState;
+import org.apache.giraph.comm.GlobalCommType;
import org.apache.giraph.comm.MasterClient;
-import org.apache.giraph.comm.aggregators.AggregatorUtils;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.MasterLoggingAggregator;
-import org.apache.giraph.utils.WritableFactory;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.reducers.Reducer;
import org.apache.giraph.utils.WritableUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
-/** Handler for aggregators on master */
-public class MasterAggregatorHandler implements MasterAggregatorUsage,
- Writable {
+/** Handler for reduce/broadcast on the master */
+public class MasterAggregatorHandler
+ implements MasterGlobalCommUsage, Writable {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(MasterAggregatorHandler.class);
- /**
- * Map of aggregators.
- * This map is used to store final aggregated values received from worker
- * owners, and also to read and write values provided during master.compute.
- */
- private final Map<String, AggregatorWrapper<Writable>> aggregatorMap =
+
+ /** Map of reducers registered for the next worker computation */
+ private final Map<String, Reducer<Object, Writable>> reducerMap =
+ Maps.newHashMap();
+ /** Map of values to be sent to workers for next computation */
+ private final Map<String, Writable> broadcastMap =
Maps.newHashMap();
- /** Aggregator writer */
+ /** Values reduced from previous computation */
+ private final Map<String, Writable> reducedMap =
+ Maps.newHashMap();
+
+ /** Aggregator writer - for writing reduced values */
private final AggregatorWriter aggregatorWriter;
/** Progressable used to report progress */
private final Progressable progressable;
- /** Giraph configuration */
- private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
/**
* Constructor
*
- * @param conf Giraph configuration
- * @param progressable Progressable used for reporting progress
+ * @param conf Configuration
+ * @param progressable Progress reporter
*/
public MasterAggregatorHandler(
ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
Progressable progressable) {
- this.conf = conf;
this.progressable = progressable;
aggregatorWriter = conf.createAggregatorWriter();
- MasterLoggingAggregator.registerAggregator(this, conf);
}
@Override
- public <A extends Writable> A getAggregatedValue(String name) {
- AggregatorWrapper<? extends Writable> aggregator = aggregatorMap.get(name);
- if (aggregator == null) {
- LOG.warn("getAggregatedValue: " +
- AggregatorUtils.getUnregisteredAggregatorMessage(name,
- aggregatorMap.size() != 0, conf));
- return null;
- } else {
- return (A) aggregator.getPreviousAggregatedValue();
- }
+ public final <S, R extends Writable> void registerReduce(
+ String name, ReduceOperation<S, R> reduceOp) {
+ registerReduce(name, reduceOp, reduceOp.createInitialValue());
}
@Override
- public <A extends Writable> void setAggregatedValue(String name, A value) {
- AggregatorWrapper<? extends Writable> aggregator = aggregatorMap.get(name);
- if (aggregator == null) {
- throw new IllegalStateException(
- "setAggregatedValue: " +
- AggregatorUtils.getUnregisteredAggregatorMessage(name,
- aggregatorMap.size() != 0, conf));
+ public <S, R extends Writable> void registerReduce(
+ String name, ReduceOperation<S, R> reduceOp,
+ R globalInitialValue) {
+ if (reducerMap.containsKey(name)) {
+ throw new IllegalArgumentException(
+ "Reducer with name " + name + " was already registered");
+ }
+ if (reduceOp == null) {
+ throw new IllegalArgumentException("null reduce cannot be registered");
}
- ((AggregatorWrapper<A>) aggregator).setCurrentAggregatedValue(value);
- }
- @Override
- public <A extends Writable> boolean registerAggregator(String name,
- Class<? extends Aggregator<A>> aggregatorClass) throws
- InstantiationException, IllegalAccessException {
- checkAggregatorName(name);
- ClassAggregatorFactory<A> aggregatorFactory =
- new ClassAggregatorFactory<A>(aggregatorClass, conf);
- return registerAggregator(name, aggregatorFactory, false) != null;
+ Reducer<S, R> reducer = new Reducer<>(reduceOp, globalInitialValue);
+ reducerMap.put(name, (Reducer<Object, Writable>) reducer);
}
@Override
- public <A extends Writable> boolean registerAggregator(String name,
- WritableFactory<? extends Aggregator<A>> aggregator) throws
- InstantiationException, IllegalAccessException {
- checkAggregatorName(name);
- return registerAggregator(name, aggregator, false) != null;
+ public <T extends Writable> T getReduced(String name) {
+ return (T) reducedMap.get(name);
}
@Override
- public <A extends Writable> boolean registerPersistentAggregator(String name,
- Class<? extends Aggregator<A>> aggregatorClass) throws
- InstantiationException, IllegalAccessException {
- checkAggregatorName(name);
- ClassAggregatorFactory<A> aggregatorFactory =
- new ClassAggregatorFactory<A>(aggregatorClass, conf);
- return registerAggregator(name, aggregatorFactory, true) != null;
- }
-
- /**
- * Make sure user doesn't use AggregatorUtils.SPECIAL_COUNT_AGGREGATOR as
- * the name of aggregator. Throw an exception if he tries to use it.
- *
- * @param name Name of the aggregator to check.
- */
- private void checkAggregatorName(String name) {
- if (name.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
- throw new IllegalStateException("checkAggregatorName: " +
- AggregatorUtils.SPECIAL_COUNT_AGGREGATOR +
- " is not allowed for the name of aggregator");
+ public void broadcast(String name, Writable object) {
+ if (broadcastMap.containsKey(name)) {
+ throw new IllegalArgumentException(
+ "Value already broadcasted for name " + name);
}
- }
-
- /**
- * Helper function for registering aggregators.
- *
- * @param name Name of the aggregator
- * @param aggregatorFactory Aggregator factory
- * @param persistent Whether aggregator is persistent or not
- * @param <A> Aggregated value type
- * @return Newly registered aggregator or aggregator which was previously
- * created with selected name, if any
- */
- private <A extends Writable> AggregatorWrapper<A> registerAggregator
- (String name, WritableFactory<? extends Aggregator<A>> aggregatorFactory,
- boolean persistent) throws InstantiationException,
- IllegalAccessException {
- AggregatorWrapper<A> aggregatorWrapper =
- (AggregatorWrapper<A>) aggregatorMap.get(name);
- if (aggregatorWrapper == null) {
- aggregatorWrapper =
- new AggregatorWrapper<A>(aggregatorFactory, persistent, conf);
- aggregatorMap.put(name, (AggregatorWrapper<Writable>) aggregatorWrapper);
+ if (object == null) {
+ throw new IllegalArgumentException("null cannot be broadcasted");
}
- return aggregatorWrapper;
+
+ broadcastMap.put(name, object);
}
- /**
- * Prepare aggregators for current superstep
- *
- * @param masterClient IPC client on master
- */
- public void prepareSuperstep(MasterClient masterClient) {
+ /** Prepare reduced values for current superstep's master compute */
+ public void prepareSuperstep() {
if (LOG.isDebugEnabled()) {
- LOG.debug("prepareSuperstep: Start preparing aggregators");
+ LOG.debug("prepareSuperstep: Start preparing reducers");
}
- // prepare aggregators for master compute
- for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
- if (aggregator.isPersistent()) {
- aggregator.aggregateCurrent(aggregator.getPreviousAggregatedValue());
+
+ Preconditions.checkState(reducedMap.isEmpty(),
+ "reducedMap must be empty before start of the superstep");
+ Preconditions.checkState(broadcastMap.isEmpty(),
+ "broadcastMap must be empty before start of the superstep");
+
+ for (Entry<String, Reducer<Object, Writable>> entry :
+ reducerMap.entrySet()) {
+ Writable value = entry.getValue().getCurrentValue();
+ if (value == null) {
+ value = entry.getValue().createInitialValue();
}
- aggregator.setPreviousAggregatedValue(
- aggregator.getCurrentAggregatedValue());
- aggregator.resetCurrentAggregator();
- progressable.progress();
+
+ reducedMap.put(entry.getKey(), value);
}
- MasterLoggingAggregator.logAggregatedValue(this, conf);
+
+ reducerMap.clear();
+
if (LOG.isDebugEnabled()) {
LOG.debug("prepareSuperstep: Aggregators prepared");
}
}
- /**
- * Finalize aggregators for current superstep and share them with workers
- *
- * @param masterClient IPC client on master
- */
- public void finishSuperstep(MasterClient masterClient) {
+ /** Finalize aggregators for current superstep */
+ public void finishSuperstep() {
if (LOG.isDebugEnabled()) {
LOG.debug("finishSuperstep: Start finishing aggregators");
}
- for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
- if (aggregator.isChanged()) {
- // if master compute changed the value, use the one he chose
- aggregator.setPreviousAggregatedValue(
- aggregator.getCurrentAggregatedValue());
- // reset aggregator for the next superstep
- aggregator.resetCurrentAggregator();
- }
- progressable.progress();
+
+ reducedMap.clear();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("finishSuperstep: Aggregators finished");
}
+ }
- // send aggregators to their owners
- // TODO: if aggregator owner and it's value didn't change,
- // we don't need to resend it
+ /**
+ * Send data to workers (through owner workers)
+ *
+ * @param masterClient IPC client on master
+ */
+ public void sendDataToOwners(MasterClient masterClient) {
+ // send broadcast values and reduceOperations to their owners
try {
- for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
- aggregatorMap.entrySet()) {
- masterClient.sendAggregator(entry.getKey(),
- entry.getValue().getAggregatorFactory(),
- entry.getValue().getPreviousAggregatedValue());
+ for (Entry<String, Reducer<Object, Writable>> entry :
+ reducerMap.entrySet()) {
+ masterClient.sendToOwner(entry.getKey(),
+ GlobalCommType.REDUCE_OPERATIONS,
+ entry.getValue().getReduceOp());
progressable.progress();
}
- masterClient.finishSendingAggregatedValues();
+
+ for (Entry<String, Writable> entry : broadcastMap.entrySet()) {
+ masterClient.sendToOwner(entry.getKey(),
+ GlobalCommType.BROADCAST,
+ entry.getValue());
+ progressable.progress();
+ }
+ masterClient.finishSendingValues();
+
+ broadcastMap.clear();
} catch (IOException e) {
throw new IllegalStateException("finishSuperstep: " +
"IOException occurred while sending aggregators", e);
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("finishSuperstep: Aggregators finished");
- }
}
/**
- * Accept aggregated values sent by worker. Every aggregator will be sent
+ * Accept reduced values sent by worker. Every value will be sent
* only once, by its owner.
* We don't need to count the number of these requests because global
* superstep barrier will happen after workers ensure all requests of this
* type have been received and processed by master.
*
- * @param aggregatedValuesInput Input in which aggregated values are
+ * @param reducedValuesInput Input in which aggregated values are
* written in the following format:
- * number_of_aggregators
- * name_1 value_1
- * name_2 value_2
+ * numReducers
+ * name_1 REDUCED_VALUE value_1
+ * name_2 REDUCED_VALUE value_2
* ...
* @throws IOException
*/
- public void acceptAggregatedValues(
- DataInput aggregatedValuesInput) throws IOException {
- int numAggregators = aggregatedValuesInput.readInt();
- for (int i = 0; i < numAggregators; i++) {
- String aggregatorName = aggregatedValuesInput.readUTF();
- AggregatorWrapper<Writable> aggregator =
- aggregatorMap.get(aggregatorName);
- if (aggregator == null) {
+ public void acceptReducedValues(
+ DataInput reducedValuesInput) throws IOException {
+ int numReducers = reducedValuesInput.readInt();
+ for (int i = 0; i < numReducers; i++) {
+ String name = reducedValuesInput.readUTF();
+ GlobalCommType type =
+ GlobalCommType.values()[reducedValuesInput.readByte()];
+ if (type != GlobalCommType.REDUCED_VALUE) {
throw new IllegalStateException(
- "acceptAggregatedValues: " +
- "Master received aggregator which isn't registered: " +
- aggregatorName);
+ "SendReducedToMasterRequest received " + type);
+ }
+ Reducer<Object, Writable> reducer = reducerMap.get(name);
+ if (reducer == null) {
+ throw new IllegalStateException(
+ "acceptReducedValues: " +
+ "Master received reduced value which isn't registered: " +
+ name);
+ }
+
+ Writable valueToReduce = reducer.createInitialValue();
+ valueToReduce.readFields(reducedValuesInput);
+
+ if (reducer.getCurrentValue() != null) {
+ reducer.reducePartial(valueToReduce);
+ } else {
+ reducer.setCurrentValue(valueToReduce);
}
- Writable aggregatorValue = aggregator.createInitialValue();
- aggregatorValue.readFields(aggregatedValuesInput);
- aggregator.setCurrentAggregatedValue(aggregatorValue);
progressable.progress();
}
if (LOG.isDebugEnabled()) {
- LOG.debug("acceptAggregatedValues: Accepted one set with " +
- numAggregators + " aggregated values");
+ LOG.debug("acceptReducedValues: Accepted one set with " +
+ numReducers + " aggregated values");
}
}
@@ -281,23 +242,10 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage,
* @param superstep Superstep which just finished
* @param superstepState State of the superstep which just finished
*/
- public void writeAggregators(long superstep, SuperstepState superstepState) {
+ public void writeAggregators(
+ long superstep, SuperstepState superstepState) {
try {
- Iterable<Map.Entry<String, Writable>> iter =
- Iterables.transform(
- aggregatorMap.entrySet(),
- new Function<Map.Entry<String, AggregatorWrapper<Writable>>,
- Map.Entry<String, Writable>>() {
- @Override
- public Map.Entry<String, Writable> apply(
- Map.Entry<String, AggregatorWrapper<Writable>> entry) {
- progressable.progress();
- return new AbstractMap.SimpleEntry<String,
- Writable>(entry.getKey(),
- entry.getValue().getPreviousAggregatedValue());
- }
- });
- aggregatorWriter.writeAggregator(iter,
+ aggregatorWriter.writeAggregator(reducedMap.entrySet(),
(superstepState == SuperstepState.ALL_SUPERSTEPS_DONE) ?
AggregatorWriter.LAST_SUPERSTEP : superstep);
} catch (IOException e) {
@@ -333,43 +281,44 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage,
@Override
public void write(DataOutput out) throws IOException {
- out.writeInt(aggregatorMap.size());
- for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
- aggregatorMap.entrySet()) {
+ // At the end of superstep, only reduceOpMap can be non-empty
+ Preconditions.checkState(reducedMap.isEmpty(),
+ "reducedMap must be empty at the end of the superstep");
+
+ out.writeInt(reducerMap.size());
+ for (Entry<String, Reducer<Object, Writable>> entry :
+ reducerMap.entrySet()) {
out.writeUTF(entry.getKey());
- WritableUtils.writeWritableObject(
- entry.getValue().getAggregatorFactory(), out);
- out.writeBoolean(entry.getValue().isPersistent());
- entry.getValue().getPreviousAggregatedValue().write(out);
+ entry.getValue().write(out);
progressable.progress();
}
+
+ out.writeInt(broadcastMap.size());
+ for (Entry<String, Writable> entry : broadcastMap.entrySet()) {
+ out.writeUTF(entry.getKey());
+ WritableUtils.writeWritableObject(entry.getValue(), out);
+ }
}
@Override
public void readFields(DataInput in) throws IOException {
- aggregatorMap.clear();
- int numAggregators = in.readInt();
- try {
- for (int i = 0; i < numAggregators; i++) {
- String aggregatorName = in.readUTF();
- WritableFactory<Aggregator<Writable>> aggregatorFactory =
- WritableUtils.readWritableObject(in, conf);
- boolean isPersistent = in.readBoolean();
- AggregatorWrapper<Writable> aggregatorWrapper = registerAggregator(
- aggregatorName,
- aggregatorFactory,
- isPersistent);
- Writable value = aggregatorWrapper.createInitialValue();
- value.readFields(in);
- aggregatorWrapper.setPreviousAggregatedValue(value);
- progressable.progress();
- }
- } catch (InstantiationException e) {
- throw new IllegalStateException("readFields: " +
- "InstantiationException occurred", e);
- } catch (IllegalAccessException e) {
- throw new IllegalStateException("readFields: " +
- "IllegalAccessException occurred", e);
+ reducedMap.clear();
+ broadcastMap.clear();
+ reducerMap.clear();
+
+ int numReducers = in.readInt();
+ for (int i = 0; i < numReducers; i++) {
+ String name = in.readUTF();
+ Reducer<Object, Writable> reducer = new Reducer<>();
+ reducer.readFields(in);
+ reducerMap.put(name, reducer);
+ }
+
+ int numBroadcast = in.readInt();
+ for (int i = 0; i < numBroadcast; i++) {
+ String name = in.readUTF();
+ Writable value = WritableUtils.readWritableObject(in, null);
+ broadcastMap.put(name, value);
}
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
index 552cca9..72e4d0a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
@@ -24,6 +24,7 @@ import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
import org.apache.giraph.graph.Computation;
import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.reducers.ReduceOperation;
import org.apache.giraph.utils.WritableFactory;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper;
@@ -43,7 +44,7 @@ import org.apache.hadoop.mapreduce.Mapper;
*/
public abstract class MasterCompute
extends DefaultImmutableClassesGiraphConfigurable
- implements MasterAggregatorUsage, Writable {
+ implements MasterAggregatorUsage, MasterGlobalCommUsage, Writable {
/** If true, do not do anymore computation on this vertex. */
private boolean halt = false;
/** Master aggregator usage */
@@ -190,10 +191,33 @@ public abstract class MasterCompute
}
@Override
+ public final <S, R extends Writable> void registerReduce(
+ String name, ReduceOperation<S, R> reduceOp) {
+ serviceMaster.getGlobalCommHandler().registerReduce(name, reduceOp);
+ }
+
+ @Override
+ public final <S, R extends Writable> void registerReduce(
+ String name, ReduceOperation<S, R> reduceOp, R globalInitialValue) {
+ serviceMaster.getGlobalCommHandler().registerReduce(
+ name, reduceOp, globalInitialValue);
+ }
+
+ @Override
+ public final <T extends Writable> T getReduced(String name) {
+ return serviceMaster.getGlobalCommHandler().getReduced(name);
+ }
+
+ @Override
+ public final void broadcast(String name, Writable object) {
+ serviceMaster.getGlobalCommHandler().broadcast(name, object);
+ }
+
+ @Override
public final <A extends Writable> boolean registerAggregator(
String name, Class<? extends Aggregator<A>> aggregatorClass)
throws InstantiationException, IllegalAccessException {
- return serviceMaster.getAggregatorHandler().registerAggregator(
+ return serviceMaster.getAggregatorTranslationHandler().registerAggregator(
name, aggregatorClass);
}
@@ -201,7 +225,7 @@ public abstract class MasterCompute
public final <A extends Writable> boolean registerAggregator(
String name, WritableFactory<? extends Aggregator<A>> aggregator)
throws InstantiationException, IllegalAccessException {
- return serviceMaster.getAggregatorHandler().registerAggregator(
+ return serviceMaster.getAggregatorTranslationHandler().registerAggregator(
name, aggregator);
}
@@ -210,19 +234,21 @@ public abstract class MasterCompute
String name,
Class<? extends Aggregator<A>> aggregatorClass) throws
InstantiationException, IllegalAccessException {
- return serviceMaster.getAggregatorHandler().registerPersistentAggregator(
- name, aggregatorClass);
+ return serviceMaster.getAggregatorTranslationHandler()
+ .registerPersistentAggregator(name, aggregatorClass);
}
@Override
public final <A extends Writable> A getAggregatedValue(String name) {
- return serviceMaster.getAggregatorHandler().<A>getAggregatedValue(name);
+ return serviceMaster.getAggregatorTranslationHandler()
+ .<A>getAggregatedValue(name);
}
@Override
public final <A extends Writable> void setAggregatedValue(
String name, A value) {
- serviceMaster.getAggregatorHandler().setAggregatedValue(name, value);
+ serviceMaster.getAggregatorTranslationHandler()
+ .setAggregatedValue(name, value);
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
new file mode 100644
index 0000000..c3ce0ea
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.master;
+
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Master compute can access reduce and broadcast methods
+ * through this interface, from masterCompute method.
+ */
+public interface MasterGlobalCommUsage {
+ /**
+ * Register reducer to be reduced in the next worker computation,
+ * using given name and operations.
+ * @param name Name of the reducer
+ * @param reduceOp Reduce operations
+ * @param <S> Single value type
+ * @param <R> Reduced value type
+ */
+ <S, R extends Writable> void registerReduce(
+ String name, ReduceOperation<S, R> reduceOp);
+
+ /**
+ * Register reducer to be reduced in the next worker computation, using
+ * given name and operations, starting globally from globalInitialValue.
+ * (globalInitialValue is reduced only once, each worker will still start
+ * from neutral initial value)
+ *
+ * @param name Name of the reducer
+ * @param reduceOp Reduce operations
+ * @param globalInitialValue Global initial value
+ * @param <S> Single value type
+ * @param <R> Reduced value type
+ */
+ <S, R extends Writable> void registerReduce(
+ String name, ReduceOperation<S, R> reduceOp, R globalInitialValue);
+
+ /**
+ * Get reduced value from previous worker computation.
+ * @param name Name of the reducer
+ * @return Reduced value
+ * @param <R> Reduced value type
+ */
+ <R extends Writable> R getReduced(String name);
+
+ /**
+ * Broadcast given value to all workers for next computation.
+ * @param name Name of the broadcast object
+ * @param value Value
+ */
+ void broadcast(String name, Writable value);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java b/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java
new file mode 100644
index 0000000..a675f4d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.reducers;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * ReduceOperation object when single object being reduced is of
+ * same type as reduced value.
+ *
+ * @param <R> Reduced object type.
+ */
+public abstract class OnSameReduceOperation<R extends Writable>
+ implements ReduceOperation<R, R> {
+ @Override
+ public final void reducePartial(R curValue, R valueToReduce) {
+ reduceSingle(curValue, valueToReduce);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java b/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java
new file mode 100644
index 0000000..434e21a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.reducers;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Reduce operations defining how to reduce single values
+ * passed on workers, into partial values on workers, and then
+ * into a single global reduced value.
+ *
+ * Object should be thread safe. Most frequently it should be
+ * immutable object, so that functions can execute concurrently.
+ * Rarely when object is mutable ({@link AggregatorReduceOperation}),
+ * i.e. stores reusable object inside, accesses should be synchronized.
+ *
+ * @param <S> Single value type, objects passed on workers
+ * @param <R> Reduced value type
+ */
+public interface ReduceOperation<S, R extends Writable> extends Writable {
+ /**
+ * Return new reduced value which is neutral to reduce operation.
+ *
+ * @return Neutral value
+ */
+ R createInitialValue();
+ /**
+ * Add a new value.
+ * Needs to be commutative and associative
+ *
+ * @param curValue Partial value into which to reduce and store the result
+ * @param valueToReduce Single value to be reduced
+ */
+ void reduceSingle(R curValue, S valueToReduce);
+ /**
+ * Add partially reduced value to current partially reduced value.
+ *
+ * @param curValue Partial value into which to reduce and store the result
+ * @param valueToReduce Partial value to be reduced
+ */
+ void reducePartial(R curValue, R valueToReduce);
+}