You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2012/10/26 03:00:55 UTC
svn commit: r1402363 [2/2] - in /giraph/trunk: ./
giraph/src/main/java/org/apache/giraph/benchmark/
giraph/src/main/java/org/apache/giraph/bsp/
giraph/src/main/java/org/apache/giraph/comm/
giraph/src/main/java/org/apache/giraph/comm/aggregators/ giraph...
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java?rev=1402363&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java Fri Oct 26 01:00:54 2012
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.aggregators.AggregatorUtils;
+import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
+import org.apache.giraph.graph.Aggregator;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+/**
+ * Request to send final aggregatd values from master to worker which owns
+ * the aggregators
+ */
+public class SendAggregatorsToOwnerRequest extends ByteArrayRequest
+ implements WorkerRequest {
+
+ /**
+ * Constructor
+ *
+ * @param data Serialized aggregator data
+ */
+ public SendAggregatorsToOwnerRequest(byte[] data) {
+ super(data);
+ }
+
+ /**
+ * Constructor used for reflection only
+ */
+ public SendAggregatorsToOwnerRequest() {
+ }
+
+ @Override
+ public void doRequest(ServerData serverData) {
+ DataInput input = getDataInput();
+ AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
+ try {
+ int numAggregators = input.readInt();
+ for (int i = 0; i < numAggregators; i++) {
+ String aggregatorName = input.readUTF();
+ String aggregatorClassName = input.readUTF();
+ if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
+ LongWritable count = new LongWritable(0);
+ count.readFields(input);
+ aggregatorData.receivedRequestCountFromMaster(count.get());
+ } else {
+ Class<Aggregator<Writable>> aggregatorClass =
+ AggregatorUtils.getAggregatorClass(aggregatorClassName);
+ aggregatorData.registerAggregatorClass(aggregatorName,
+ aggregatorClass);
+ Writable aggregatorValue =
+ aggregatorData.createAggregatorInitialValue(aggregatorName);
+ aggregatorValue.readFields(input);
+ aggregatorData.setAggregatorValue(aggregatorName, aggregatorValue);
+ serverData.getOwnerAggregatorData().registerAggregator(
+ aggregatorName, aggregatorClass);
+ }
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException("doRequest: " +
+ "IOException occurred while processing request", e);
+ }
+ aggregatorData.receivedRequestFromMaster(getData());
+ }
+
+ @Override
+ public RequestType getType() {
+ return RequestType.SEND_AGGREGATORS_TO_OWNER_REQUEST;
+ }
+}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java?rev=1402363&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java Fri Oct 26 01:00:54 2012
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.aggregators.AggregatorUtils;
+import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
+import org.apache.giraph.graph.Aggregator;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+/**
+ * Request to send final aggregated values from worker which owns them to
+ * other workers
+ */
+public class SendAggregatorsToWorkerRequest extends
+ ByteArrayRequest implements WorkerRequest {
+
+ /**
+ * Constructor
+ *
+ * @param data Serialized aggregator data
+ */
+ public SendAggregatorsToWorkerRequest(byte[] data) {
+ super(data);
+ }
+
+ /**
+ * Constructor used for reflection only
+ */
+ public SendAggregatorsToWorkerRequest() {
+ }
+
+ @Override
+ public void doRequest(ServerData serverData) {
+ DataInput input = getDataInput();
+ AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
+ try {
+ int numAggregators = input.readInt();
+ for (int i = 0; i < numAggregators; i++) {
+ String aggregatorName = input.readUTF();
+ String aggregatorClassName = input.readUTF();
+ if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
+ LongWritable count = new LongWritable(0);
+ count.readFields(input);
+ aggregatorData.receivedRequestCountFromWorker(count.get());
+ } else {
+ Class<Aggregator<Writable>> aggregatorClass =
+ AggregatorUtils.getAggregatorClass(aggregatorClassName);
+ aggregatorData.registerAggregatorClass(aggregatorName,
+ aggregatorClass);
+ Writable aggregatorValue =
+ aggregatorData.createAggregatorInitialValue(aggregatorName);
+ aggregatorValue.readFields(input);
+ aggregatorData.setAggregatorValue(aggregatorName, aggregatorValue);
+ }
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException("doRequest: " +
+ "IOException occurred while processing request", e);
+ }
+ aggregatorData.receivedRequestFromWorker();
+ }
+
+ @Override
+ public RequestType getType() {
+ return RequestType.SEND_AGGREGATORS_TO_WORKER_REQUEST;
+ }
+}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java?rev=1402363&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java Fri Oct 26 01:00:54 2012
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.aggregators.AggregatorUtils;
+import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+/**
+ * Request to send partial aggregated values for current superstep (values
+ * which were computed by one worker's vertices)
+ */
+public class SendWorkerAggregatorsRequest extends
+ ByteArrayRequest implements WorkerRequest {
+
+ /**
+ * Constructor
+ *
+ * @param data Serialized aggregator data
+ */
+ public SendWorkerAggregatorsRequest(byte[] data) {
+ super(data);
+ }
+
+ /**
+ * Constructor used for reflection only
+ */
+ public SendWorkerAggregatorsRequest() {
+ }
+
+ @Override
+ public void doRequest(ServerData serverData) {
+ DataInput input = getDataInput();
+ OwnerAggregatorServerData aggregatorData =
+ serverData.getOwnerAggregatorData();
+ try {
+ int numAggregators = input.readInt();
+ for (int i = 0; i < numAggregators; i++) {
+ String aggregatorName = input.readUTF();
+ if (aggregatorName.equals(
+ AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
+ LongWritable count = new LongWritable(0);
+ count.readFields(input);
+ aggregatorData.receivedRequestCountFromWorker(count.get());
+ } else {
+ Writable aggregatedValue =
+ aggregatorData.createAggregatorInitialValue(aggregatorName);
+ aggregatedValue.readFields(input);
+ aggregatorData.aggregate(aggregatorName, aggregatedValue);
+ }
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException("doRequest: " +
+ "IOException occurred while processing request", e);
+ }
+ aggregatorData.receivedRequestFromWorker();
+ }
+
+ @Override
+ public RequestType getType() {
+ return RequestType.SEND_WORKER_AGGREGATORS_REQUEST;
+ }
+}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java?rev=1402363&r1=1402362&r2=1402363&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java Fri Oct 26 01:00:54 2012
@@ -40,6 +40,10 @@ public class AggregatorsTestVertex exten
private static final String MASTER_WRITE_AGG = "master";
/** Value which master compute will use */
private static final long MASTER_VALUE = 12345;
+ /** Prefix for name of aggregators in array */
+ private static final String ARRAY_PREFIX_AGG = "array";
+ /** Number of aggregators to use in array */
+ private static final int NUM_OF_AGGREGATORS_IN_ARRAY = 100;
@Override
public void compute(Iterable<DoubleWritable> messages) throws IOException {
@@ -62,6 +66,12 @@ public class AggregatorsTestVertex exten
assertEquals(MASTER_VALUE * (1L << superstep),
((LongWritable) getAggregatedValue(MASTER_WRITE_AGG)).get());
+ for (int i = 0; i < NUM_OF_AGGREGATORS_IN_ARRAY; i++) {
+ aggregate(ARRAY_PREFIX_AGG + i, new LongWritable((superstep + 1) * i));
+ assertEquals(superstep * getTotalNumVertices() * i,
+ ((LongWritable) getAggregatedValue(ARRAY_PREFIX_AGG + i)).get());
+ }
+
if (getSuperstep() == 10) {
voteToHalt();
}
@@ -88,6 +98,11 @@ public class AggregatorsTestVertex exten
}
assertEquals(nv * ((1L << superstep) - 1),
((LongWritable) getAggregatedValue(PERSISTENT_AGG)).get());
+
+ for (int i = 0; i < NUM_OF_AGGREGATORS_IN_ARRAY; i++) {
+ assertEquals(superstep * getTotalNumVertices() * i,
+ ((LongWritable) getAggregatedValue(ARRAY_PREFIX_AGG + i)).get());
+ }
}
@Override
@@ -97,6 +112,10 @@ public class AggregatorsTestVertex exten
registerPersistentAggregator(PERSISTENT_AGG,
LongSumAggregator.class);
registerAggregator(MASTER_WRITE_AGG, LongSumAggregator.class);
+
+ for (int i = 0; i < NUM_OF_AGGREGATORS_IN_ARRAY; i++) {
+ registerAggregator(ARRAY_PREFIX_AGG + i, LongSumAggregator.class);
+ }
}
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java?rev=1402363&r1=1402362&r2=1402363&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java Fri Oct 26 01:00:54 2012
@@ -117,9 +117,6 @@ public abstract class BspService<I exten
public static final String SUPERSTEP_FINISHED_NODE = "/_superstepFinished";
/** Denotes which workers have been cleaned up */
public static final String CLEANED_UP_DIR = "/_cleanedUpDir";
- /** JSON aggregator value array key */
- public static final String JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY =
- "_aggregatorValueArrayKey";
/** JSON partition stats key */
public static final String JSONOBJ_PARTITION_STATS_KEY =
"_partitionStatsKey";
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1402363&r1=1402362&r2=1402363&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Fri Oct 26 01:00:54 2012
@@ -611,7 +611,7 @@ public class BspServiceMaster<I extends
}
@Override
- public MasterAggregatorUsage getAggregatorUsage() {
+ public MasterAggregatorHandler getAggregatorHandler() {
return aggregatorHandler;
}
@@ -650,7 +650,6 @@ public class BspServiceMaster<I extends
}
aggregatorHandler.readFields(finalizedStream);
- aggregatorHandler.finishSuperstep(superstep - 1, this);
masterCompute.readFields(finalizedStream);
finalizedStream.close();
@@ -768,7 +767,8 @@ public class BspServiceMaster<I extends
getTaskPartition() -
currentMasterTaskPartitionCounter.getValue());
masterCompute = getConfiguration().createMasterCompute();
- aggregatorHandler = new MasterAggregatorHandler(getConfiguration());
+ aggregatorHandler = new MasterAggregatorHandler(getConfiguration(),
+ getContext());
aggregatorHandler.initialize(this);
commService = new NettyMasterClientServer(
@@ -1288,6 +1288,12 @@ public class BspServiceMaster<I extends
chosenWorkerInfoList,
masterGraphPartitioner);
+ // We need to finalize aggregators from previous superstep (send them to
+ // worker owners) after new worker assignments
+ if (getSuperstep() >= 0) {
+ aggregatorHandler.finishSuperstep(commService);
+ }
+
// Finalize the valid checkpoint file prefixes and possibly
// the aggregators.
if (checkpointFrequencyMet(getSuperstep())) {
@@ -1347,9 +1353,8 @@ public class BspServiceMaster<I extends
// Collect aggregator values, then run the master.compute() and
// finally save the aggregator values
- aggregatorHandler.prepareSuperstep(getSuperstep(), this);
+ aggregatorHandler.prepareSuperstep(commService);
runMasterCompute(getSuperstep());
- aggregatorHandler.finishSuperstep(getSuperstep(), this);
// If the master is halted or all the vertices voted to halt and there
// are no more messages in the system, stop the computation
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1402363&r1=1402362&r2=1402363&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Fri Oct 26 01:00:54 2012
@@ -28,9 +28,11 @@ import org.apache.giraph.GiraphConfigura
import org.apache.giraph.bsp.ApplicationState;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
import org.apache.giraph.comm.WorkerClient;
import org.apache.giraph.comm.WorkerClientRequestProcessor;
import org.apache.giraph.comm.WorkerServer;
+import org.apache.giraph.comm.netty.NettyWorkerAggregatorRequestProcessor;
import org.apache.giraph.comm.netty.NettyWorkerClient;
import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
import org.apache.giraph.comm.netty.NettyWorkerServer;
@@ -105,6 +107,9 @@ public class BspServiceWorker<I extends
private final WorkerClient<I, V, E, M> workerClient;
/** IPC Server */
private final WorkerServer<I, V, E, M> workerServer;
+ /** Request processor for aggregator requests */
+ private final WorkerAggregatorRequestProcessor
+ workerAggregatorRequestProcessor;
/** Master info */
private WorkerInfo masterInfo = new WorkerInfo();
/** List of workers */
@@ -149,14 +154,17 @@ public class BspServiceWorker<I extends
workerClient = new NettyWorkerClient<I, V, E, M>(context,
getConfiguration(), this);
-
+ workerAggregatorRequestProcessor =
+ new NettyWorkerAggregatorRequestProcessor(getContext(),
+ getConfiguration(), this);
workerInfo = new WorkerInfo(
getHostname(), getTaskPartition(), workerServer.getPort());
this.workerContext =
getConfiguration().createWorkerContext(null);
- aggregatorHandler = new WorkerAggregatorHandler();
+ aggregatorHandler =
+ new WorkerAggregatorHandler(this, getConfiguration(), context);
}
@Override
@@ -640,9 +648,6 @@ else[HADOOP_NON_SECURE]*/
addressesAndPartitionsPath);
}
- if (getSuperstep() != INPUT_SUPERSTEP) {
- aggregatorHandler.prepareSuperstep(getSuperstep(), this);
- }
getContext().setStatus("startSuperstep: " +
getGraphMapper().getMapFunctions().toString() +
" - Attempt=" + getApplicationAttempt() +
@@ -684,14 +689,14 @@ else[HADOOP_NON_SECURE]*/
getContext().progress();
}
+ aggregatorHandler.finishSuperstep(workerAggregatorRequestProcessor);
+
if (LOG.isInfoEnabled()) {
LOG.info("finishSuperstep: Superstep " + getSuperstep() +
", messages = " + workerSentMessages + " " +
MemoryUtils.getRuntimeMemoryStats());
}
- byte[] aggregatorArray =
- aggregatorHandler.finishSuperstep(getSuperstep());
Collection<PartitionStats> finalizedPartitionStats =
workerGraphPartitioner.finalizePartitionStats(
partitionStatsList, getPartitionStore());
@@ -701,8 +706,6 @@ else[HADOOP_NON_SECURE]*/
WritableUtils.writeListToByteArray(finalizedPartitionStatsList);
JSONObject workerFinishedInfoObj = new JSONObject();
try {
- workerFinishedInfoObj.put(JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY,
- Base64.encodeBytes(aggregatorArray));
workerFinishedInfoObj.put(JSONOBJ_PARTITION_STATS_KEY,
Base64.encodeBytes(partitionStatsBytes));
workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY,
@@ -1295,7 +1298,14 @@ else[HADOOP_NON_SECURE]*/
}
@Override
- public WorkerAggregatorUsage getAggregatorUsage() {
+ public WorkerAggregatorHandler getAggregatorHandler() {
return aggregatorHandler;
}
+
+ @Override
+ public void prepareSuperstep() {
+ if (getSuperstep() != INPUT_SUPERSTEP) {
+ aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
+ }
+ }
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1402363&r1=1402362&r2=1402363&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java Fri Oct 26 01:00:54 2012
@@ -129,7 +129,7 @@ public class GraphMapper<I extends Writa
* @return Worker aggregator usage interface
*/
public final WorkerAggregatorUsage getWorkerAggregatorUsage() {
- return serviceWorker.getAggregatorUsage();
+ return serviceWorker.getAggregatorHandler();
}
/**
@@ -138,7 +138,7 @@ public class GraphMapper<I extends Writa
* @return Master aggregator usage interface
*/
public final MasterAggregatorUsage getMasterAggregatorUsage() {
- return serviceMaster.getAggregatorUsage();
+ return serviceMaster.getAggregatorHandler();
}
public final WorkerContext getWorkerContext() {
@@ -488,6 +488,8 @@ public class GraphMapper<I extends Writa
serviceWorker.storeCheckpoint();
}
+ serviceWorker.prepareSuperstep();
+
serviceWorker.getWorkerContext().setGraphState(graphState);
serviceWorker.getWorkerContext().preSuperstep();
context.progress();
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java?rev=1402363&r1=1402362&r2=1402363&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java Fri Oct 26 01:00:54 2012
@@ -18,51 +18,68 @@
package org.apache.giraph.graph;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.SuperstepState;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.giraph.comm.MasterClientServer;
+import org.apache.giraph.comm.aggregators.AggregatorUtils;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs;
-import org.json.JSONException;
-import org.json.JSONObject;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
-import net.iharder.Base64;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.DataOutput;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.util.AbstractMap;
-import java.util.List;
import java.util.Map;
-/** Master implementation of {@link AggregatorHandler} */
-public class MasterAggregatorHandler extends AggregatorHandler implements
- MasterAggregatorUsage, Writable {
+/** Handler for aggregators on master */
+public class MasterAggregatorHandler implements MasterAggregatorUsage,
+ Writable {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(MasterAggregatorHandler.class);
+ /**
+ * Map of aggregators.
+ * This map is used to store final aggregated values received from worker
+ * owners, and also to read and write values provided during master.compute.
+ */
+ private final Map<String, AggregatorWrapper<Writable>> aggregatorMap =
+ Maps.newHashMap();
/** Aggregator writer */
private final AggregatorWriter aggregatorWriter;
+ /** Progressable used to report progress */
+ private final Progressable progressable;
/**
- * @param config Hadoop configuration
+ * Constructor
+ *
+ * @param conf Giraph configuration
+ * @param progressable Progressable used for reporting progress
*/
- public MasterAggregatorHandler(Configuration config) {
- aggregatorWriter = BspUtils.createAggregatorWriter(config);
+ public MasterAggregatorHandler(
+ ImmutableClassesGiraphConfiguration<?, ?, ?, ?> conf,
+ Progressable progressable) {
+ this.progressable = progressable;
+ aggregatorWriter = conf.createAggregatorWriter();
+ }
+
+ @Override
+ public <A extends Writable> A getAggregatedValue(String name) {
+ AggregatorWrapper<? extends Writable> aggregator = aggregatorMap.get(name);
+ if (aggregator == null) {
+ return null;
+ } else {
+ return (A) aggregator.getPreviousAggregatedValue();
+ }
}
@Override
public <A extends Writable> void setAggregatedValue(String name, A value) {
- AggregatorWrapper<? extends Writable> aggregator = getAggregator(name);
+ AggregatorWrapper<? extends Writable> aggregator = aggregatorMap.get(name);
if (aggregator == null) {
throw new IllegalStateException(
"setAggregatedValue: Tried to set value of aggregator which wasn't" +
@@ -75,6 +92,7 @@ public class MasterAggregatorHandler ext
public <A extends Writable> boolean registerAggregator(String name,
Class<? extends Aggregator<A>> aggregatorClass) throws
InstantiationException, IllegalAccessException {
+ checkAggregatorName(name);
return registerAggregator(name, aggregatorClass, false) != null;
}
@@ -82,115 +100,81 @@ public class MasterAggregatorHandler ext
public <A extends Writable> boolean registerPersistentAggregator(String name,
Class<? extends Aggregator<A>> aggregatorClass) throws
InstantiationException, IllegalAccessException {
+ checkAggregatorName(name);
return registerAggregator(name, aggregatorClass, true) != null;
}
/**
- * Get aggregator values supplied by workers for a particular superstep and
- * aggregate them
+ * Make sure user doesn't use AggregatorUtils.SPECIAL_COUNT_AGGREGATOR as
+ * the name of aggregator. Throw an exception if he tries to use it.
*
- * @param superstep Superstep which we are preparing for
- * @param service BspService to get zookeeper info from
+ * @param name Name of the aggregator to check.
*/
- public void prepareSuperstep(long superstep, BspService service) {
- String workerFinishedPath =
- service.getWorkerFinishedPath(
- service.getApplicationAttempt(), superstep);
- List<String> hostnameIdPathList = null;
- try {
- hostnameIdPathList =
- service.getZkExt().getChildrenExt(
- workerFinishedPath, false, false, true);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: InterruptedException", e);
+ private void checkAggregatorName(String name) {
+ if (name.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
+ throw new IllegalStateException("checkAggregatorName: " +
+ AggregatorUtils.SPECIAL_COUNT_AGGREGATOR +
+ " is not allowed for the name of aggregator");
}
+ }
- for (String hostnameIdPath : hostnameIdPathList) {
- JSONObject workerFinishedInfoObj = null;
- byte[] aggregatorArray = null;
- try {
- byte[] zkData =
- service.getZkExt().getData(hostnameIdPath, false, null);
- workerFinishedInfoObj = new JSONObject(new String(zkData));
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: InterruptedException",
- e);
- } catch (JSONException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: JSONException", e);
- }
- try {
- aggregatorArray = Base64.decode(workerFinishedInfoObj.getString(
- service.JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY));
- } catch (JSONException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("collectAndProcessAggregatorValues: " +
- "No aggregators" + " for " + hostnameIdPath);
- }
- continue;
- } catch (IOException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: IOException", e);
- }
-
- DataInputStream input =
- new DataInputStream(new ByteArrayInputStream(aggregatorArray));
- try {
- while (input.available() > 0) {
- String aggregatorName = input.readUTF();
- AggregatorWrapper<Writable> aggregator =
- getAggregatorMap().get(aggregatorName);
- if (aggregator == null) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: " +
- "Master received aggregator which isn't registered: " +
- aggregatorName);
- }
- Writable aggregatorValue = aggregator.createInitialValue();
- aggregatorValue.readFields(input);
- aggregator.aggregateCurrent(aggregatorValue);
- }
- } catch (IOException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: " +
- "IOException when reading aggregator data", e);
- }
+ /**
+ * Helper function for registering aggregators.
+ *
+ * @param name Name of the aggregator
+ * @param aggregatorClass Class of the aggregator
+ * @param persistent Whether aggregator is persistent or not
+ * @param <A> Aggregated value type
+ * @return Newly registered aggregator or aggregator which was previously
+ * created with selected name, if any
+ */
+ private <A extends Writable> AggregatorWrapper<A> registerAggregator
+ (String name, Class<? extends Aggregator<A>> aggregatorClass,
+ boolean persistent) throws InstantiationException,
+ IllegalAccessException {
+ AggregatorWrapper<A> aggregatorWrapper =
+ (AggregatorWrapper<A>) aggregatorMap.get(name);
+ if (aggregatorWrapper == null) {
+ aggregatorWrapper =
+ new AggregatorWrapper<A>(aggregatorClass, persistent);
+ aggregatorMap.put(name, (AggregatorWrapper<Writable>) aggregatorWrapper);
}
+ return aggregatorWrapper;
+ }
- if (LOG.isInfoEnabled()) {
- LOG.info("collectAndProcessAggregatorValues: Processed aggregators");
+ /**
+ * Prepare aggregators for current superstep
+ *
+ * @param commService Communication service
+ */
+ public void prepareSuperstep(MasterClientServer commService) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("prepareSuperstep: Start preapring aggregators");
}
-
// prepare aggregators for master compute
- for (AggregatorWrapper<Writable> aggregator :
- getAggregatorMap().values()) {
+ for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
if (aggregator.isPersistent()) {
aggregator.aggregateCurrent(aggregator.getPreviousAggregatedValue());
}
aggregator.setPreviousAggregatedValue(
aggregator.getCurrentAggregatedValue());
aggregator.resetCurrentAggregator();
+ progressable.progress();
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("prepareSuperstep: Aggregators prepared");
}
}
/**
- * Save the supplied aggregator values.
+ * Finalize aggregators for current superstep and share them with workers
*
- * @param superstep Superstep which we are finishing.
- * @param service BspService to get zookeeper info from
+ * @param commService Communication service
*/
- public void finishSuperstep(long superstep, BspService service) {
- Map<String, AggregatorWrapper<Writable>> aggregatorMap =
- getAggregatorMap();
-
+ public void finishSuperstep(MasterClientServer commService) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("finishSuperstep: Start finishing aggregators");
+ }
for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
if (aggregator.isChanged()) {
// if master compute changed the value, use the one he chose
@@ -199,74 +183,86 @@ public class MasterAggregatorHandler ext
// reset aggregator for the next superstep
aggregator.resetCurrentAggregator();
}
+ progressable.progress();
}
- if (aggregatorMap.size() > 0) {
- String mergedAggregatorPath =
- service.getMergedAggregatorPath(
- service.getApplicationAttempt(),
- superstep);
-
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- DataOutput output = new DataOutputStream(outputStream);
- try {
- output.writeInt(aggregatorMap.size());
- } catch (IOException e) {
- e.printStackTrace();
- }
+ // send aggregators to their owners
+ // TODO: if aggregator owner and it's value didn't change,
+ // we don't need to resend it
+ try {
for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
aggregatorMap.entrySet()) {
- try {
- output.writeUTF(entry.getKey());
- output.writeUTF(entry.getValue().getAggregatorClass().getName());
- entry.getValue().getPreviousAggregatedValue().write(output);
- } catch (IOException e) {
- throw new IllegalStateException("saveAggregatorValues: " +
- "IllegalStateException", e);
- }
+ commService.sendAggregator(entry.getKey(),
+ entry.getValue().getAggregatorClass(),
+ entry.getValue().getPreviousAggregatedValue());
+ progressable.progress();
}
+ commService.finishSendingAggregatedValues();
+ } catch (IOException e) {
+ throw new IllegalStateException("finishSuperstep: " +
+ "IOException occurred while sending aggregators", e);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("finishSuperstep: Aggregators finished");
+ }
+ }
- try {
- service.getZkExt().createExt(mergedAggregatorPath,
- outputStream.toByteArray(),
- ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
- } catch (KeeperException.NodeExistsException e) {
- LOG.warn("saveAggregatorValues: " +
- mergedAggregatorPath + " already exists!");
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "saveAggregatorValues: KeeperException", e);
- } catch (InterruptedException e) {
+ /**
+ * Accept aggregated values sent by worker. Every aggregator will be sent
+ * only once, by its owner.
+ * We don't need to count the number of these requests because global
+ * superstep barrier will happen after workers ensure all requests of this
+ * type have been received and processed by master.
+ *
+ * @param aggregatedValuesInput Input in which aggregated values are
+ * written in the following format:
+ * number_of_aggregators
+ * name_1 value_1
+ * name_2 value_2
+ * ...
+ * @throws IOException
+ */
+ public void acceptAggregatedValues(
+ DataInput aggregatedValuesInput) throws IOException {
+ int numAggregators = aggregatedValuesInput.readInt();
+ for (int i = 0; i < numAggregators; i++) {
+ String aggregatorName = aggregatedValuesInput.readUTF();
+ AggregatorWrapper<Writable> aggregator =
+ aggregatorMap.get(aggregatorName);
+ if (aggregator == null) {
throw new IllegalStateException(
- "saveAggregatorValues: IllegalStateException",
- e);
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("saveAggregatorValues: Finished loading " +
- mergedAggregatorPath);
+ "acceptAggregatedValues: " +
+ "Master received aggregator which isn't registered: " +
+ aggregatorName);
}
+ Writable aggregatorValue = aggregator.createInitialValue();
+ aggregatorValue.readFields(aggregatedValuesInput);
+ aggregator.setCurrentAggregatedValue(aggregatorValue);
+ progressable.progress();
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("acceptAggregatedValues: Accepted one set with " +
+ numAggregators + " aggregated values");
}
}
/**
* Write aggregators to {@link AggregatorWriter}
*
- * @param superstep Superstep which just finished
+ * @param superstep Superstep which just finished
* @param superstepState State of the superstep which just finished
*/
- public void writeAggregators(long superstep,
- SuperstepState superstepState) {
+ public void writeAggregators(long superstep, SuperstepState superstepState) {
try {
Iterable<Map.Entry<String, Writable>> iter =
Iterables.transform(
- getAggregatorMap().entrySet(),
+ aggregatorMap.entrySet(),
new Function<Map.Entry<String, AggregatorWrapper<Writable>>,
Map.Entry<String, Writable>>() {
@Override
public Map.Entry<String, Writable> apply(
Map.Entry<String, AggregatorWrapper<Writable>> entry) {
+ progressable.progress();
return new AbstractMap.SimpleEntry<String,
Writable>(entry.getKey(),
entry.getValue().getPreviousAggregatedValue());
@@ -292,7 +288,7 @@ public class MasterAggregatorHandler ext
aggregatorWriter.initialize(service.getContext(),
service.getApplicationAttempt());
} catch (IOException e) {
- throw new IllegalStateException("MasterAggregatorHandler: " +
+ throw new IllegalStateException("initialize: " +
"Couldn't initialize aggregatorWriter", e);
}
}
@@ -308,8 +304,6 @@ public class MasterAggregatorHandler ext
@Override
public void write(DataOutput out) throws IOException {
- Map<String, AggregatorWrapper<Writable>> aggregatorMap =
- getAggregatorMap();
out.writeInt(aggregatorMap.size());
for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
aggregatorMap.entrySet()) {
@@ -317,25 +311,34 @@ public class MasterAggregatorHandler ext
out.writeUTF(entry.getValue().getAggregatorClass().getName());
out.writeBoolean(entry.getValue().isPersistent());
entry.getValue().getPreviousAggregatedValue().write(out);
+ progressable.progress();
}
}
@Override
public void readFields(DataInput in) throws IOException {
- Map<String, AggregatorWrapper<Writable>> aggregatorMap =
- getAggregatorMap();
aggregatorMap.clear();
int numAggregators = in.readInt();
- for (int i = 0; i < numAggregators; i++) {
- String aggregatorName = in.readUTF();
- String aggregatorClassName = in.readUTF();
- boolean isPersistent = in.readBoolean();
- AggregatorWrapper<Writable> aggregator =
- registerAggregator(aggregatorName, aggregatorClassName,
- isPersistent);
- Writable value = aggregator.createInitialValue();
- value.readFields(in);
- aggregator.setPreviousAggregatedValue(value);
+ try {
+ for (int i = 0; i < numAggregators; i++) {
+ String aggregatorName = in.readUTF();
+ String aggregatorClassName = in.readUTF();
+ boolean isPersistent = in.readBoolean();
+ AggregatorWrapper<Writable> aggregator = registerAggregator(
+ aggregatorName,
+ AggregatorUtils.getAggregatorClass(aggregatorClassName),
+ isPersistent);
+ Writable value = aggregator.createInitialValue();
+ value.readFields(in);
+ aggregator.setPreviousAggregatedValue(value);
+ progressable.progress();
+ }
+ } catch (InstantiationException e) {
+ throw new IllegalStateException("readFields: " +
+ "InstantiationException occurred", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException("readFields: " +
+ "IllegalAccessException occurred", e);
}
}
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java?rev=1402363&r1=1402362&r2=1402363&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java Fri Oct 26 01:00:54 2012
@@ -18,134 +18,199 @@
package org.apache.giraph.graph;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
+import org.apache.giraph.comm.aggregators.AggregatedValueOutputStream;
+import org.apache.giraph.comm.aggregators.AggregatorUtils;
+import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
+import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import com.google.common.collect.Maps;
+
import java.io.IOException;
import java.util.Map;
/**
- * Worker implementation of {@link AggregatorHandler}
+ * Handler for aggregators on worker. Provides the aggregated values and
+ * performs aggregations from user vertex code (thread-safe). Also has
+ * methods for all superstep coordination related to aggregators.
+ *
+ * At the beginning of any superstep any worker calls prepareSuperstep(),
+ * which blocks until the final aggregates from the previous superstep have
+ * been delivered to the worker.
+ * Next, during the superstep worker can call aggregate() and
+ * getAggregatedValue() (both methods are thread safe) the former
+ * computes partial aggregates for this superstep from the worker,
+ * the latter returns (read-only) final aggregates from the previous superstep.
+ * Finally, at the end of the superstep, the worker calls finishSuperstep(),
+ * which propagates non-owned partial aggregates to the owner workers,
+ * and sends the final aggregate from the owner worker to the master.
*/
-public class WorkerAggregatorHandler extends AggregatorHandler implements
- WorkerAggregatorUsage {
+public class WorkerAggregatorHandler implements WorkerAggregatorUsage {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(WorkerAggregatorHandler.class);
+ /** Map of values from previous superstep */
+ private Map<String, Writable> previousAggregatedValueMap =
+ Maps.newHashMap();
+ /** Map of aggregators for current superstep */
+ private Map<String, Aggregator<Writable>> currentAggregatorMap =
+ Maps.newHashMap();
+ /** Service worker */
+ private final CentralizedServiceWorker<?, ?, ?, ?> serviceWorker;
+ /** Progressable for reporting progress */
+ private final Progressable progressable;
+ /** How big a single aggregator request can be */
+ private final int maxBytesPerAggregatorRequest;
+
+ /**
+ * Constructor
+ *
+ * @param serviceWorker Service worker
+ * @param conf Giraph configuration
+ * @param progressable Progressable for reporting progress
+ */
+ public WorkerAggregatorHandler(
+ CentralizedServiceWorker<?, ?, ?, ?> serviceWorker,
+ ImmutableClassesGiraphConfiguration conf,
+ Progressable progressable) {
+ this.serviceWorker = serviceWorker;
+ this.progressable = progressable;
+ maxBytesPerAggregatorRequest = conf.getInt(
+ AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST,
+ AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST_DEFAULT);
+ }
@Override
public <A extends Writable> void aggregate(String name, A value) {
- AggregatorWrapper<? extends Writable> aggregator = getAggregator(name);
+ Aggregator<Writable> aggregator = currentAggregatorMap.get(name);
if (aggregator != null) {
- ((AggregatorWrapper<A>) aggregator).aggregateCurrent(value);
+ // TODO we can later improve this for mutlithreading to have local
+ // copies of aggregators per thread
+ synchronized (aggregator) {
+ aggregator.aggregate(value);
+ }
} else {
throw new IllegalStateException("aggregate: Tried to aggregate value " +
"to unregistered aggregator " + name);
}
}
+ @Override
+ public <A extends Writable> A getAggregatedValue(String name) {
+ return (A) previousAggregatedValueMap.get(name);
+ }
+
/**
- * Get aggregator values aggregated by master in previous superstep
+ * Prepare aggregators for current superstep
*
- * @param superstep Superstep which we are preparing for
- * @param service BspService to get zookeeper info from
+ * @param requestProcessor Request processor for aggregators
*/
- public void prepareSuperstep(long superstep, BspService service) {
- // prepare aggregators for reading and next superstep
- for (AggregatorWrapper<Writable> aggregator :
- getAggregatorMap().values()) {
- aggregator.setPreviousAggregatedValue(aggregator.createInitialValue());
- aggregator.resetCurrentAggregator();
- }
- String mergedAggregatorPath =
- service.getMergedAggregatorPath(service.getApplicationAttempt(),
- superstep - 1);
-
- byte[] aggregatorArray;
- try {
- aggregatorArray =
- service.getZkExt().getData(mergedAggregatorPath, false, null);
- } catch (KeeperException.NoNodeException e) {
- LOG.info("getAggregatorValues: no aggregators in " +
- mergedAggregatorPath + " on superstep " + superstep);
- return;
- } catch (KeeperException e) {
- throw new IllegalStateException("Failed to get data for " +
- mergedAggregatorPath + " with KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException("Failed to get data for " +
- mergedAggregatorPath + " with InterruptedException", e);
- }
-
- DataInput input =
- new DataInputStream(new ByteArrayInputStream(aggregatorArray));
- int numAggregators = 0;
-
+ public void prepareSuperstep(
+ WorkerAggregatorRequestProcessor requestProcessor) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("prepareSuperstep: Start preparing aggregators");
+ }
+ AllAggregatorServerData allAggregatorData =
+ serviceWorker.getServerData().getAllAggregatorData();
+ // Wait for my aggregators
+ Iterable<byte[]> dataToDistribute =
+ allAggregatorData.getDataFromMasterWhenReady();
try {
- numAggregators = input.readInt();
+ // Distribute my aggregators
+ requestProcessor.distributeAggregators(dataToDistribute);
} catch (IOException e) {
- throw new IllegalStateException("getAggregatorValues: " +
- "Failed to decode data", e);
+ throw new IllegalStateException("prepareSuperstep: " +
+ "IOException occurred while trying to distribute aggregators", e);
+ }
+ // Wait for all other aggregators and store them
+ allAggregatorData.fillNextSuperstepMapsWhenReady(
+ serviceWorker.getWorkerInfoList().size(), previousAggregatedValueMap,
+ currentAggregatorMap);
+ allAggregatorData.reset();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("prepareSuperstep: Aggregators prepared");
}
+ }
- for (int i = 0; i < numAggregators; i++) {
+ /**
+ * Send aggregators to their owners and in the end to the master
+ *
+ * @param requestProcessor Request processor for aggregators
+ */
+ public void finishSuperstep(
+ WorkerAggregatorRequestProcessor requestProcessor) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("finishSuperstep: Start finishing aggregators");
+ }
+ OwnerAggregatorServerData ownerAggregatorData =
+ serviceWorker.getServerData().getOwnerAggregatorData();
+ // First send partial aggregated values to their owners and determine
+ // which aggregators belong to this worker
+ for (Map.Entry<String, Aggregator<Writable>> entry :
+ currentAggregatorMap.entrySet()) {
try {
- String aggregatorName = input.readUTF();
- String aggregatorClassName = input.readUTF();
- AggregatorWrapper<Writable> aggregator =
- registerAggregator(aggregatorName, aggregatorClassName, false);
- Writable aggregatorValue = aggregator.createInitialValue();
- aggregatorValue.readFields(input);
- aggregator.setPreviousAggregatedValue(aggregatorValue);
+ boolean sent = requestProcessor.sendAggregatedValue(entry.getKey(),
+ entry.getValue().getAggregatedValue());
+ if (!sent) {
+ // If it's my aggregator, add it directly
+ ownerAggregatorData.aggregate(entry.getKey(),
+ entry.getValue().getAggregatedValue());
+ }
} catch (IOException e) {
- throw new IllegalStateException(
- "Failed to decode data for index " + i, e);
+ throw new IllegalStateException("finishSuperstep: " +
+ "IOException occurred while sending aggregator " +
+ entry.getKey() + " to its owner", e);
}
+ progressable.progress();
}
-
- if (LOG.isInfoEnabled()) {
- LOG.info("getAggregatorValues: Finished loading " +
- mergedAggregatorPath);
+ try {
+ // Flush
+ requestProcessor.flush();
+ } catch (IOException e) {
+ throw new IllegalStateException("finishSuperstep: " +
+ "IOException occurred while sending aggregators to owners", e);
}
- }
- /**
- * Put aggregator values of the worker to a byte array that will later be
- * aggregated by master.
- *
- * @param superstep Superstep which we are finishing.
- * @return Byte array of the aggreagtor values
- */
- public byte[] finishSuperstep(long superstep) {
- if (superstep == BspService.INPUT_SUPERSTEP) {
- return new byte[0];
- }
-
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- DataOutputStream output = new DataOutputStream(outputStream);
- for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
- getAggregatorMap().entrySet()) {
- if (entry.getValue().isChanged()) {
- try {
- output.writeUTF(entry.getKey());
- entry.getValue().getCurrentAggregatedValue().write(output);
- } catch (IOException e) {
- throw new IllegalStateException("Failed to marshall aggregator " +
- "with IOException " + entry.getKey(), e);
+ // Wait to receive partial aggregated values from all other workers
+ Iterable<Map.Entry<String, Writable>> myAggregators =
+ ownerAggregatorData.getMyAggregatorValuesWhenReady(
+ serviceWorker.getWorkerInfoList().size());
+
+ // Send final aggregated values to master
+ AggregatedValueOutputStream aggregatorOutput =
+ new AggregatedValueOutputStream();
+ for (Map.Entry<String, Writable> entry : myAggregators) {
+ try {
+ int currentSize = aggregatorOutput.addAggregator(entry.getKey(),
+ entry.getValue());
+ if (currentSize > maxBytesPerAggregatorRequest) {
+ requestProcessor.sendAggregatedValuesToMaster(
+ aggregatorOutput.flush());
}
+ progressable.progress();
+ } catch (IOException e) {
+ throw new IllegalStateException("finishSuperstep: " +
+ "IOException occurred while writing aggregator " +
+ entry.getKey(), e);
}
}
+ try {
+ requestProcessor.sendAggregatedValuesToMaster(aggregatorOutput.flush());
+ } catch (IOException e) {
+ throw new IllegalStateException("finishSuperstep: " +
+ "IOException occured while sending aggregators to master", e);
+ }
+ // Wait for master to receive aggregated values before proceeding
+ serviceWorker.getWorkerClient().waitAllRequests();
- if (LOG.isInfoEnabled()) {
- LOG.info(
- "marshalAggregatorValues: Finished assembling aggregator values");
+ ownerAggregatorData.reset();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("finishSuperstep: Aggregators finished");
}
- return outputStream.toByteArray();
}
}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java?rev=1402363&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java Fri Oct 26 01:00:54 2012
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+/**
+ * User must follow this protocol for concurrent access:
+ *
+ * (1) an object instance is constructed
+ * (2) arbitrarily many times
+ * (2a) concurrent calls to requirePermits(), releasePermits() and
+ * waitForRequiredPermits() are issued
+ * (2b) waitForRequiredPermits() returns
+ *
+ * Note that the next cycle of calls to requirePermits() or releasePermits()
+ * cannot start until the previous call to waitForRequiredPermits()
+ * has returned.
+ *
+ * Methods of this class are thread-safe.
+ */
+public class ExpectedBarrier {
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(ExpectedBarrier.class);
+ /** Msecs to refresh the progress meter */
+ private static final int MSEC_PERIOD = 10000;
+ /** Progressable for reporting progress */
+ private final Progressable progressable;
+ /** Number of times permits were added */
+ private long timesRequired = 0;
+ /** Number of permits we are currently waiting for */
+ private long waitingOnPermits = 0;
+ /** Logger */
+ private final TimedLogger logger;
+
+ /**
+ * Constructor
+ *
+ * @param progressable Progressable for reporting progress
+ */
+ public ExpectedBarrier(Progressable progressable) {
+ this.progressable = progressable;
+ logger = new TimedLogger(MSEC_PERIOD, LOG);
+ }
+
+ /**
+ * Wait until permits have been required desired number of times,
+ * and all required permits are available
+ *
+ * @param desiredTimesRequired How many times should permits have been
+ * required
+ */
+ public synchronized void waitForRequiredPermits(
+ long desiredTimesRequired) {
+ while (timesRequired < desiredTimesRequired || waitingOnPermits > 0) {
+ try {
+ wait(MSEC_PERIOD);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("waitForRequiredPermits: " +
+ "InterruptedException occurred");
+ }
+ progressable.progress();
+ if (LOG.isInfoEnabled()) {
+ if (timesRequired < desiredTimesRequired) {
+ logger.info("waitForRequiredPermits: " +
+ "Waiting for times required to be " + desiredTimesRequired +
+ " (currently " + timesRequired + ") ");
+ } else {
+ logger.info("waitForRequiredPermits: " +
+ "Waiting for " + waitingOnPermits + " more permits.");
+ }
+ }
+ }
+
+ // Reset for the next time to use
+ timesRequired = 0;
+ waitingOnPermits = 0;
+ }
+
+ /**
+ * Require more permits. This will increase the number of times permits
+ * were required. Doesn't wait for permits to become available.
+ *
+ * @param permits Number of permits to require
+ */
+ public synchronized void requirePermits(long permits) {
+ timesRequired++;
+ waitingOnPermits += permits;
+ notifyAll();
+ }
+
+ /**
+ * Release one permit.
+ */
+ public synchronized void releaseOnePermit() {
+ releasePermits(1);
+ }
+
+ /**
+ * Release some permits.
+ *
+ * @param permits Number of permits to release
+ */
+ public synchronized void releasePermits(long permits) {
+ waitingOnPermits -= permits;
+ notifyAll();
+ }
+}
Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java?rev=1402363&r1=1402362&r2=1402363&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java Fri Oct 26 01:00:54 2012
@@ -20,8 +20,10 @@ package org.apache.giraph.graph;
import org.apache.giraph.BspCase;
import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.aggregators.DoubleOverwriteAggregator;
import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.comm.aggregators.AggregatorUtils;
import org.apache.giraph.examples.AggregatorsTestVertex;
import org.apache.giraph.examples.SimpleCheckpointVertex;
import org.apache.giraph.examples.SimplePageRankVertex;
@@ -30,7 +32,11 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Progressable;
+import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -41,6 +47,8 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Map;
/** Tests if aggregators are handled on a proper way */
public class TestAggregatorsHandling extends BspCase {
@@ -49,6 +57,21 @@ public class TestAggregatorsHandling ext
super(TestAggregatorsHandling.class.getName());
}
+ private Map<String, AggregatorWrapper<Writable>> getAggregatorMap
+ (MasterAggregatorHandler aggregatorHandler) {
+ try {
+ Field aggregtorMapField = aggregatorHandler.getClass().getDeclaredField
+ ("aggregatorMap");
+ aggregtorMapField.setAccessible(true);
+ return (Map<String, AggregatorWrapper<Writable>>)
+ aggregtorMapField.get(aggregatorHandler);
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException(e);
+ } catch (NoSuchFieldException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
/** Tests if aggregators are handled on a proper way during supersteps */
@Test
public void testAggregatorsHandling() throws IOException,
@@ -58,6 +81,9 @@ public class TestAggregatorsHandling ext
SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
job.getConfiguration().setMasterComputeClass(
AggregatorsTestVertex.AggregatorsTestMasterCompute.class);
+ // test with aggregators split in a few requests
+ job.getConfiguration().setInt(
+ AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST, 50);
assertTrue(job.run(true));
}
@@ -65,8 +91,13 @@ public class TestAggregatorsHandling ext
@Test
public void testMasterAggregatorsSerialization() throws
IllegalAccessException, InstantiationException, IOException {
+ ImmutableClassesGiraphConfiguration conf =
+ Mockito.mock(ImmutableClassesGiraphConfiguration.class);
+ Mockito.when(conf.getAggregatorWriterClass()).thenReturn(
+ TextAggregatorWriter.class);
+ Progressable progressable = Mockito.mock(Progressable.class);
MasterAggregatorHandler handler =
- new MasterAggregatorHandler(new Configuration());
+ new MasterAggregatorHandler(conf, progressable);
String regularAggName = "regular";
LongWritable regularValue = new LongWritable(5);
@@ -80,7 +111,7 @@ public class TestAggregatorsHandling ext
handler.setAggregatedValue(persistentAggName, persistentValue);
for (AggregatorWrapper<Writable> aggregator :
- handler.getAggregatorMap().values()) {
+ getAggregatorMap(handler).values()) {
aggregator.setPreviousAggregatedValue(
aggregator.getCurrentAggregatedValue());
}
@@ -89,14 +120,14 @@ public class TestAggregatorsHandling ext
handler.write(new DataOutputStream(out));
MasterAggregatorHandler restartedHandler =
- new MasterAggregatorHandler(new Configuration());
+ new MasterAggregatorHandler(conf, progressable);
restartedHandler.readFields(
new DataInputStream(new ByteArrayInputStream(out.toByteArray())));
- assertEquals(2, restartedHandler.getAggregatorMap().size());
+ assertEquals(2, getAggregatorMap(restartedHandler).size());
AggregatorWrapper<Writable> regularAgg =
- restartedHandler.getAggregatorMap().get(regularAggName);
+ getAggregatorMap(restartedHandler).get(regularAggName);
assertTrue(
regularAgg.getAggregatorClass().equals(LongSumAggregator.class));
assertEquals(regularValue, regularAgg.getPreviousAggregatedValue());
@@ -105,7 +136,7 @@ public class TestAggregatorsHandling ext
assertFalse(regularAgg.isPersistent());
AggregatorWrapper<Writable> persistentAgg =
- restartedHandler.getAggregatorMap().get(persistentAggName);
+ getAggregatorMap(restartedHandler).get(persistentAggName);
assertTrue(persistentAgg.getAggregatorClass().equals
(DoubleOverwriteAggregator.class));
assertEquals(persistentValue, persistentAgg.getPreviousAggregatedValue());