You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/01/04 21:52:39 UTC
[16/23] GIRAPH-409: Refactor / cleanups (nitay)
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java
deleted file mode 100644
index ef1e2ff..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.bsp.SuperstepState;
-import org.apache.giraph.comm.MasterClient;
-import org.apache.giraph.comm.aggregators.AggregatorUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.Progressable;
-import org.apache.log4j.Logger;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.AbstractMap;
-import java.util.Map;
-
-/** Handler for aggregators on master */
-public class MasterAggregatorHandler implements MasterAggregatorUsage,
- Writable {
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(MasterAggregatorHandler.class);
- /**
- * Map of aggregators.
- * This map is used to store final aggregated values received from worker
- * owners, and also to read and write values provided during master.compute.
- */
- private final Map<String, AggregatorWrapper<Writable>> aggregatorMap =
- Maps.newHashMap();
- /** Aggregator writer */
- private final AggregatorWriter aggregatorWriter;
- /** Progressable used to report progress */
- private final Progressable progressable;
-
- /**
- * Constructor
- *
- * @param conf Giraph configuration
- * @param progressable Progressable used for reporting progress
- */
- public MasterAggregatorHandler(
- ImmutableClassesGiraphConfiguration<?, ?, ?, ?> conf,
- Progressable progressable) {
- this.progressable = progressable;
- aggregatorWriter = conf.createAggregatorWriter();
- }
-
- @Override
- public <A extends Writable> A getAggregatedValue(String name) {
- AggregatorWrapper<? extends Writable> aggregator = aggregatorMap.get(name);
- if (aggregator == null) {
- return null;
- } else {
- return (A) aggregator.getPreviousAggregatedValue();
- }
- }
-
- @Override
- public <A extends Writable> void setAggregatedValue(String name, A value) {
- AggregatorWrapper<? extends Writable> aggregator = aggregatorMap.get(name);
- if (aggregator == null) {
- throw new IllegalStateException(
- "setAggregatedValue: Tried to set value of aggregator which wasn't" +
- " registered " + name);
- }
- ((AggregatorWrapper<A>) aggregator).setCurrentAggregatedValue(value);
- }
-
- @Override
- public <A extends Writable> boolean registerAggregator(String name,
- Class<? extends Aggregator<A>> aggregatorClass) throws
- InstantiationException, IllegalAccessException {
- checkAggregatorName(name);
- return registerAggregator(name, aggregatorClass, false) != null;
- }
-
- @Override
- public <A extends Writable> boolean registerPersistentAggregator(String name,
- Class<? extends Aggregator<A>> aggregatorClass) throws
- InstantiationException, IllegalAccessException {
- checkAggregatorName(name);
- return registerAggregator(name, aggregatorClass, true) != null;
- }
-
- /**
- * Make sure user doesn't use AggregatorUtils.SPECIAL_COUNT_AGGREGATOR as
- * the name of aggregator. Throw an exception if he tries to use it.
- *
- * @param name Name of the aggregator to check.
- */
- private void checkAggregatorName(String name) {
- if (name.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
- throw new IllegalStateException("checkAggregatorName: " +
- AggregatorUtils.SPECIAL_COUNT_AGGREGATOR +
- " is not allowed for the name of aggregator");
- }
- }
-
- /**
- * Helper function for registering aggregators.
- *
- * @param name Name of the aggregator
- * @param aggregatorClass Class of the aggregator
- * @param persistent Whether aggregator is persistent or not
- * @param <A> Aggregated value type
- * @return Newly registered aggregator or aggregator which was previously
- * created with selected name, if any
- */
- private <A extends Writable> AggregatorWrapper<A> registerAggregator
- (String name, Class<? extends Aggregator<A>> aggregatorClass,
- boolean persistent) throws InstantiationException,
- IllegalAccessException {
- AggregatorWrapper<A> aggregatorWrapper =
- (AggregatorWrapper<A>) aggregatorMap.get(name);
- if (aggregatorWrapper == null) {
- aggregatorWrapper =
- new AggregatorWrapper<A>(aggregatorClass, persistent);
- aggregatorMap.put(name, (AggregatorWrapper<Writable>) aggregatorWrapper);
- }
- return aggregatorWrapper;
- }
-
- /**
- * Prepare aggregators for current superstep
- *
- * @param masterClient IPC client on master
- */
- public void prepareSuperstep(MasterClient masterClient) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("prepareSuperstep: Start preapring aggregators");
- }
- // prepare aggregators for master compute
- for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
- if (aggregator.isPersistent()) {
- aggregator.aggregateCurrent(aggregator.getPreviousAggregatedValue());
- }
- aggregator.setPreviousAggregatedValue(
- aggregator.getCurrentAggregatedValue());
- aggregator.resetCurrentAggregator();
- progressable.progress();
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("prepareSuperstep: Aggregators prepared");
- }
- }
-
- /**
- * Finalize aggregators for current superstep and share them with workers
- *
- * @param masterClient IPC client on master
- */
- public void finishSuperstep(MasterClient masterClient) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("finishSuperstep: Start finishing aggregators");
- }
- for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
- if (aggregator.isChanged()) {
- // if master compute changed the value, use the one he chose
- aggregator.setPreviousAggregatedValue(
- aggregator.getCurrentAggregatedValue());
- // reset aggregator for the next superstep
- aggregator.resetCurrentAggregator();
- }
- progressable.progress();
- }
-
- // send aggregators to their owners
- // TODO: if aggregator owner and it's value didn't change,
- // we don't need to resend it
- try {
- for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
- aggregatorMap.entrySet()) {
- masterClient.sendAggregator(entry.getKey(),
- entry.getValue().getAggregatorClass(),
- entry.getValue().getPreviousAggregatedValue());
- progressable.progress();
- }
- masterClient.finishSendingAggregatedValues();
- } catch (IOException e) {
- throw new IllegalStateException("finishSuperstep: " +
- "IOException occurred while sending aggregators", e);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("finishSuperstep: Aggregators finished");
- }
- }
-
- /**
- * Accept aggregated values sent by worker. Every aggregator will be sent
- * only once, by its owner.
- * We don't need to count the number of these requests because global
- * superstep barrier will happen after workers ensure all requests of this
- * type have been received and processed by master.
- *
- * @param aggregatedValuesInput Input in which aggregated values are
- * written in the following format:
- * number_of_aggregators
- * name_1 value_1
- * name_2 value_2
- * ...
- * @throws IOException
- */
- public void acceptAggregatedValues(
- DataInput aggregatedValuesInput) throws IOException {
- int numAggregators = aggregatedValuesInput.readInt();
- for (int i = 0; i < numAggregators; i++) {
- String aggregatorName = aggregatedValuesInput.readUTF();
- AggregatorWrapper<Writable> aggregator =
- aggregatorMap.get(aggregatorName);
- if (aggregator == null) {
- throw new IllegalStateException(
- "acceptAggregatedValues: " +
- "Master received aggregator which isn't registered: " +
- aggregatorName);
- }
- Writable aggregatorValue = aggregator.createInitialValue();
- aggregatorValue.readFields(aggregatedValuesInput);
- aggregator.setCurrentAggregatedValue(aggregatorValue);
- progressable.progress();
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("acceptAggregatedValues: Accepted one set with " +
- numAggregators + " aggregated values");
- }
- }
-
- /**
- * Write aggregators to {@link AggregatorWriter}
- *
- * @param superstep Superstep which just finished
- * @param superstepState State of the superstep which just finished
- */
- public void writeAggregators(long superstep, SuperstepState superstepState) {
- try {
- Iterable<Map.Entry<String, Writable>> iter =
- Iterables.transform(
- aggregatorMap.entrySet(),
- new Function<Map.Entry<String, AggregatorWrapper<Writable>>,
- Map.Entry<String, Writable>>() {
- @Override
- public Map.Entry<String, Writable> apply(
- Map.Entry<String, AggregatorWrapper<Writable>> entry) {
- progressable.progress();
- return new AbstractMap.SimpleEntry<String,
- Writable>(entry.getKey(),
- entry.getValue().getPreviousAggregatedValue());
- }
- });
- aggregatorWriter.writeAggregator(iter,
- (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE) ?
- AggregatorWriter.LAST_SUPERSTEP : superstep);
- } catch (IOException e) {
- throw new IllegalStateException(
- "coordinateSuperstep: IOException while " +
- "writing aggregators data", e);
- }
- }
-
- /**
- * Initialize {@link AggregatorWriter}
- *
- * @param service BspService
- */
- public void initialize(BspService service) {
- try {
- aggregatorWriter.initialize(service.getContext(),
- service.getApplicationAttempt());
- } catch (IOException e) {
- throw new IllegalStateException("initialize: " +
- "Couldn't initialize aggregatorWriter", e);
- }
- }
-
- /**
- * Close {@link AggregatorWriter}
- *
- * @throws IOException
- */
- public void close() throws IOException {
- aggregatorWriter.close();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(aggregatorMap.size());
- for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
- aggregatorMap.entrySet()) {
- out.writeUTF(entry.getKey());
- out.writeUTF(entry.getValue().getAggregatorClass().getName());
- out.writeBoolean(entry.getValue().isPersistent());
- entry.getValue().getPreviousAggregatedValue().write(out);
- progressable.progress();
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- aggregatorMap.clear();
- int numAggregators = in.readInt();
- try {
- for (int i = 0; i < numAggregators; i++) {
- String aggregatorName = in.readUTF();
- String aggregatorClassName = in.readUTF();
- boolean isPersistent = in.readBoolean();
- AggregatorWrapper<Writable> aggregator = registerAggregator(
- aggregatorName,
- AggregatorUtils.getAggregatorClass(aggregatorClassName),
- isPersistent);
- Writable value = aggregator.createInitialValue();
- value.readFields(in);
- aggregator.setPreviousAggregatedValue(value);
- progressable.progress();
- }
- } catch (InstantiationException e) {
- throw new IllegalStateException("readFields: " +
- "InstantiationException occurred", e);
- } catch (IllegalAccessException e) {
- throw new IllegalStateException("readFields: " +
- "IllegalAccessException occurred", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/MasterAggregatorUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/MasterAggregatorUsage.java b/giraph-core/src/main/java/org/apache/giraph/graph/MasterAggregatorUsage.java
deleted file mode 100644
index 6e6571b..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/MasterAggregatorUsage.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-
-/**
- * Master compute can access and change aggregators through this interface
- */
-public interface MasterAggregatorUsage {
- /**
- * Register an aggregator in preSuperstep() and/or preApplication(). This
- * aggregator will have its value reset at the end of each super step.
- *
- * @param name of aggregator
- * @param aggregatorClass Class type of the aggregator
- * @param <A> Aggregator type
- * @return True iff aggregator wasn't already registered
- */
- <A extends Writable> boolean registerAggregator(String name,
- Class<? extends Aggregator<A>> aggregatorClass) throws
- InstantiationException, IllegalAccessException;
-
- /**
- * Register persistent aggregator in preSuperstep() and/or
- * preApplication(). This aggregator will not reset value at the end of
- * super step.
- *
- * @param name of aggregator
- * @param aggregatorClass Class type of the aggregator
- * @param <A> Aggregator type
- * @return True iff aggregator wasn't already registered
- */
- <A extends Writable> boolean registerPersistentAggregator(String name,
- Class<? extends Aggregator<A>> aggregatorClass) throws
- InstantiationException, IllegalAccessException;
-
- /**
- * Get value of an aggregator.
- *
- * @param name Name of aggregator
- * @param <A> Aggregated value
- * @return Value of the aggregator
- */
- <A extends Writable> A getAggregatedValue(String name);
-
- /**
- * Sets value of an aggregator.
- *
- * @param name Name of aggregator
- * @param value Value to set
- * @param <A> Aggregated value
- */
- <A extends Writable> void setAggregatedValue(String name, A value);
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/MasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/graph/MasterCompute.java
deleted file mode 100644
index 4641621..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/MasterCompute.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Mapper;
-
-/**
- * Interface for defining a master vertex that can perform centralized
- * computation between supersteps. This class will be instantiated on the
- * master node and will run every superstep before the workers do.
- *
- * Communication with the workers should be performed via aggregators. The
- * values of the aggregators are broadcast to the workers before
- * vertex.compute() is called and collected by the master before
- * master.compute() is called. This means aggregator values used by the workers
- * are consistent with aggregator values from the master from the same
- * superstep and aggregator used by the master are consistent with aggregator
- * values from the workers from the previous superstep. Note that the master
- * has to register its own aggregators (it does not call {@link WorkerContext}
- * functions), but it uses all aggregators by default, so useAggregator does
- * not have to be called.
- */
-@SuppressWarnings("rawtypes")
-public abstract class MasterCompute implements MasterAggregatorUsage, Writable,
- ImmutableClassesGiraphConfigurable {
- /** If true, do not do anymore computation on this vertex. */
- private boolean halt = false;
- /** Global graph state **/
- private GraphState graphState;
- /** Configuration */
- private ImmutableClassesGiraphConfiguration conf;
-
- /**
- * Must be defined by user to specify what the master has to do.
- */
- public abstract void compute();
-
- /**
- * Initialize the MasterCompute class, this is the place to register
- * aggregators.
- */
- public abstract void initialize() throws InstantiationException,
- IllegalAccessException;
-
- /**
- * Retrieves the current superstep.
- *
- * @return Current superstep
- */
- public long getSuperstep() {
- return getGraphState().getSuperstep();
- }
-
- /**
- * Get the total (all workers) number of vertices that
- * existed in the previous superstep.
- *
- * @return Total number of vertices (-1 if first superstep)
- */
- public long getTotalNumVertices() {
- return getGraphState().getTotalNumVertices();
- }
-
- /**
- * Get the total (all workers) number of edges that
- * existed in the previous superstep.
- *
- * @return Total number of edges (-1 if first superstep)
- */
- public long getTotalNumEdges() {
- return getGraphState().getTotalNumEdges();
- }
-
- /**
- * After this is called, the computation will stop, even if there are
- * still messages in the system or vertices that have not voted to halt.
- */
- public void haltComputation() {
- halt = true;
- }
-
- /**
- * Has the master halted?
- *
- * @return True if halted, false otherwise.
- */
- public boolean isHalted() {
- return halt;
- }
-
- /**
- * Get the graph state for all workers.
- *
- * @return Graph state for all workers
- */
- GraphState getGraphState() {
- return graphState;
- }
-
- /**
- * Set the graph state for all workers
- *
- * @param graphState Graph state for all workers
- */
- void setGraphState(GraphState graphState) {
- this.graphState = graphState;
- }
-
- /**
- * Get the mapper context
- *
- * @return Mapper context
- */
- public Mapper.Context getContext() {
- return getGraphState().getContext();
- }
-
- @Override
- public final <A extends Writable> boolean registerAggregator(
- String name, Class<? extends Aggregator<A>> aggregatorClass)
- throws InstantiationException, IllegalAccessException {
- return getGraphState().getGraphMapper().getMasterAggregatorUsage().
- registerAggregator(name, aggregatorClass);
- }
-
- @Override
- public final <A extends Writable> boolean registerPersistentAggregator(
- String name,
- Class<? extends Aggregator<A>> aggregatorClass) throws
- InstantiationException, IllegalAccessException {
- return getGraphState().getGraphMapper().getMasterAggregatorUsage().
- registerPersistentAggregator(name, aggregatorClass);
- }
-
- @Override
- public <A extends Writable> A getAggregatedValue(String name) {
- return getGraphState().getGraphMapper().getMasterAggregatorUsage().
- <A>getAggregatedValue(name);
- }
-
- @Override
- public <A extends Writable> void setAggregatedValue(String name, A value) {
- getGraphState().getGraphMapper().getMasterAggregatorUsage().
- setAggregatedValue(name, value);
- }
-
- @Override
- public ImmutableClassesGiraphConfiguration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(ImmutableClassesGiraphConfiguration conf) {
- this.conf = conf;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/MasterInfo.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/MasterInfo.java b/giraph-core/src/main/java/org/apache/giraph/graph/MasterInfo.java
deleted file mode 100644
index 50ad6aa..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/MasterInfo.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-/**
- * Information about the master that is sent to other workers.
- */
-public class MasterInfo extends TaskInfo {
- /**
- * Constructor
- */
- public MasterInfo() {
- }
-
- @Override
- public String toString() {
- return "Master(" + super.toString() + ")";
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/MasterThread.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/MasterThread.java b/giraph-core/src/main/java/org/apache/giraph/graph/MasterThread.java
deleted file mode 100644
index e27de42..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/MasterThread.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.bsp.ApplicationState;
-import org.apache.giraph.bsp.CentralizedServiceMaster;
-import org.apache.giraph.bsp.SuperstepState;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.counters.GiraphTimers;
-import org.apache.giraph.metrics.GiraphMetrics;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.log4j.Logger;
-
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
-/**
- * Master thread that will coordinate the activities of the tasks. It runs
- * on all task processes, however, will only execute its algorithm if it knows
- * it is the "leader" from ZooKeeper.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public class MasterThread<I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable> extends Thread {
- /** Counter group name for the Giraph timers */
- public static final String GIRAPH_TIMERS_COUNTER_GROUP_NAME = "Giraph Timers";
- /** Class logger */
- private static final Logger LOG = Logger.getLogger(MasterThread.class);
- /** Reference to shared BspService */
- private CentralizedServiceMaster<I, V, E, M> bspServiceMaster = null;
- /** Context (for counters) */
- private final Context context;
- /** Use superstep counters? */
- private final boolean superstepCounterOn;
- /** Setup seconds */
- private double setupSecs = 0d;
- /** Superstep timer (in seconds) map */
- private final Map<Long, Double> superstepSecsMap =
- new TreeMap<Long, Double>();
-
- /**
- * Constructor.
- *
- * @param bspServiceMaster Master that already exists and setup() has
- * been called.
- * @param context Context from the Mapper.
- */
- MasterThread(CentralizedServiceMaster<I, V, E, M> bspServiceMaster,
- Context context) {
- super(MasterThread.class.getName());
- this.bspServiceMaster = bspServiceMaster;
- this.context = context;
- GiraphTimers.init(context);
- superstepCounterOn = context.getConfiguration().getBoolean(
- GiraphConstants.USE_SUPERSTEP_COUNTERS,
- GiraphConstants.USE_SUPERSTEP_COUNTERS_DEFAULT);
- }
-
- /**
- * The master algorithm. The algorithm should be able to withstand
- * failures and resume as necessary since the master may switch during a
- * job.
- */
- @Override
- public void run() {
- // Algorithm:
- // 1. Become the master
- // 2. If desired, restart from a manual checkpoint
- // 3. Run all supersteps until complete
- try {
- long startMillis = System.currentTimeMillis();
- long endMillis = 0;
- bspServiceMaster.setup();
- if (bspServiceMaster.becomeMaster()) {
- // Attempt to create InputSplits if necessary. Bail out if that fails.
- if (bspServiceMaster.getRestartedSuperstep() !=
- BspService.UNSET_SUPERSTEP ||
- (bspServiceMaster.createVertexInputSplits() != -1 &&
- bspServiceMaster.createEdgeInputSplits() != -1)) {
- long setupMillis = System.currentTimeMillis() - startMillis;
- GiraphTimers.getInstance().getSetupMs().increment(setupMillis);
- setupSecs = setupMillis / 1000.0d;
- SuperstepState superstepState = SuperstepState.INITIAL;
- long cachedSuperstep = BspService.UNSET_SUPERSTEP;
- while (superstepState != SuperstepState.ALL_SUPERSTEPS_DONE) {
- long startSuperstepMillis = System.currentTimeMillis();
- cachedSuperstep = bspServiceMaster.getSuperstep();
- GiraphMetrics.get().resetSuperstepMetrics(cachedSuperstep);
- superstepState = bspServiceMaster.coordinateSuperstep();
- long superstepMillis = System.currentTimeMillis() -
- startSuperstepMillis;
- superstepSecsMap.put(Long.valueOf(cachedSuperstep),
- superstepMillis / 1000.0d);
- if (LOG.isInfoEnabled()) {
- LOG.info("masterThread: Coordination of superstep " +
- cachedSuperstep + " took " +
- superstepMillis / 1000.0d +
- " seconds ended with state " + superstepState +
- " and is now on superstep " +
- bspServiceMaster.getSuperstep());
- }
- if (superstepCounterOn) {
- GiraphTimers.getInstance().getSuperstepMs(cachedSuperstep).
- increment(superstepMillis);
- }
-
- bspServiceMaster.postSuperstep();
-
- // If a worker failed, restart from a known good superstep
- if (superstepState == SuperstepState.WORKER_FAILURE) {
- bspServiceMaster.restartFromCheckpoint(
- bspServiceMaster.getLastGoodCheckpoint());
- }
- endMillis = System.currentTimeMillis();
- }
- bspServiceMaster.setJobState(ApplicationState.FINISHED, -1, -1);
- }
- }
- bspServiceMaster.cleanup();
- if (!superstepSecsMap.isEmpty()) {
- GiraphTimers.getInstance().getShutdownMs().
- increment(System.currentTimeMillis() - endMillis);
- if (LOG.isInfoEnabled()) {
- LOG.info("setup: Took " + setupSecs + " seconds.");
- }
- for (Entry<Long, Double> entry : superstepSecsMap.entrySet()) {
- if (LOG.isInfoEnabled()) {
- if (entry.getKey().longValue() ==
- BspService.INPUT_SUPERSTEP) {
- LOG.info("input superstep: Took " +
- entry.getValue() + " seconds.");
- } else {
- LOG.info("superstep " + entry.getKey() + ": Took " +
- entry.getValue() + " seconds.");
- }
- }
- context.progress();
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("shutdown: Took " +
- (System.currentTimeMillis() - endMillis) /
- 1000.0d + " seconds.");
- LOG.info("total: Took " +
- ((System.currentTimeMillis() - startMillis) /
- 1000.0d) + " seconds.");
- }
- GiraphTimers.getInstance().getTotalMs().
- increment(System.currentTimeMillis() - startMillis);
- }
- bspServiceMaster.postApplication();
- // CHECKSTYLE: stop IllegalCatchCheck
- } catch (Exception e) {
- // CHECKSTYLE: resume IllegalCatchCheck
- bspServiceMaster.failureCleanup(e);
- LOG.error("masterThread: Master algorithm failed with " +
- e.getClass().getSimpleName(), e);
- throw new IllegalStateException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/MultiGraphEdgeListVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/MultiGraphEdgeListVertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/MultiGraphEdgeListVertex.java
deleted file mode 100644
index 19d6e0c..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/MultiGraphEdgeListVertex.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.util.Iterator;
-
-/**
- * An edge-list backed vertex that allows for parallel edges.
- * This can be used not only to support mutable multigraphs,
- * but also to make mutations and edge-based input efficient without
- * resorting to a hash-map backed vertex.
- *
- * Note: removeEdge() here removes all edges pointing to the target vertex,
- * but returns only one of them (or null if there are no such edges).
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public abstract class MultiGraphEdgeListVertex<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends EdgeListVertexBase<I, V, E, M> {
- @Override
- public final boolean addEdge(Edge<I, E> edge) {
- appendEdge(edge);
- return true;
- }
-
- @Override
- public int removeEdges(I targetVertexId) {
- int removedCount = 0;
- for (Iterator<Edge<I, E>> edges = getEdges().iterator(); edges.hasNext();) {
- Edge<I, E> edge = edges.next();
- if (edge.getTargetVertexId().equals(targetVertexId)) {
- ++removedCount;
- edges.remove();
- }
- }
- return removedCount;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/MultiGraphRepresentativeVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/MultiGraphRepresentativeVertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/MultiGraphRepresentativeVertex.java
deleted file mode 100644
index 40e929c..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/MultiGraphRepresentativeVertex.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Similar to {@link RepresentativeVertex}, but allows for parallel edges.
- *
- * Note: removeEdge() here removes all edges pointing to the target vertex,
- * but returns only one of them (or null if there are no such edges).
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public abstract class MultiGraphRepresentativeVertex<I extends
- WritableComparable, V extends Writable, E extends Writable,
- M extends Writable> extends RepresentativeVertexBase<I, V, E, M> {
- @Override
- public final boolean addEdge(Edge<I, E> edge) {
- appendEdge(edge);
- return true;
- }
-
- @Override
- public final int removeEdges(I targetVertexId) {
- return removeAllEdges(targetVertexId);
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/MutableVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/MutableVertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/MutableVertex.java
deleted file mode 100644
index 04c17ed..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/MutableVertex.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.IOException;
-import java.util.Collections;
-
-/**
- * Interface used by VertexReader to set the properties of a new vertex
- * or mutate the graph.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public abstract class MutableVertex<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends Vertex<I, V, E, M> {
- /**
- * Add an edge for this vertex (happens immediately)
- *
- * @param edge Edge to add
- * @return Return true if succeeded, false otherwise
- */
- public abstract boolean addEdge(Edge<I, E> edge);
-
- /**
- * Removes all edges pointing to the given vertex id.
- *
- * @param targetVertexId the target vertex id
- * @return the number of removed edges
- */
- public abstract int removeEdges(I targetVertexId);
-
- /**
- * Sends a request to create a vertex that will be available during the
- * next superstep.
- *
- * @param id Vertex id
- * @param value Vertex value
- * @param edges Initial edges
- */
- public void addVertexRequest(I id, V value, Iterable<Edge<I, E>> edges)
- throws IOException {
- Vertex<I, V, E, M> vertex = getConf().createVertex();
- vertex.initialize(id, value, edges);
- getGraphState().getWorkerClientRequestProcessor().addVertexRequest(vertex);
- }
-
- /**
- * Sends a request to create a vertex that will be available during the
- * next superstep.
- *
- * @param id Vertex id
- * @param value Vertex value
- */
- public void addVertexRequest(I id, V value) throws IOException {
- addVertexRequest(id, value, Collections.<Edge<I, E>>emptyList());
- }
-
- /**
- * Request to remove a vertex from the graph
- * (applied just prior to the next superstep).
- *
- * @param vertexId Id of the vertex to be removed.
- */
- public void removeVertexRequest(I vertexId) throws IOException {
- getGraphState().getWorkerClientRequestProcessor().
- removeVertexRequest(vertexId);
- }
-
- /**
- * Request to add an edge of a vertex in the graph
- * (processed just prior to the next superstep)
- *
- * @param sourceVertexId Source vertex id of edge
- * @param edge Edge to add
- */
- public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge)
- throws IOException {
- getGraphState().getWorkerClientRequestProcessor().
- addEdgeRequest(sourceVertexId, edge);
- }
-
- /**
- * Request to remove all edges from a given source vertex to a given target
- * vertex (processed just prior to the next superstep).
- *
- * @param sourceVertexId Source vertex id
- * @param targetVertexId Target vertex id
- */
- public void removeEdgesRequest(I sourceVertexId, I targetVertexId)
- throws IOException {
- getGraphState().getWorkerClientRequestProcessor().
- removeEdgesRequest(sourceVertexId, targetVertexId);
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/RepresentativeVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/RepresentativeVertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/RepresentativeVertex.java
deleted file mode 100644
index 673b402..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/RepresentativeVertex.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-/**
- * This vertex should only be used in conjunction with ByteArrayPartition since
- * it has special code to deserialize by reusing objects and not instantiating
- * new ones. If used without ByteArrayPartition, it will cause a lot of
- * wasted memory.
- *
- * Also, this vertex is optimized for space and not efficient for either adding
- * or random access of edges.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public abstract class RepresentativeVertex<
- I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends RepresentativeVertexBase<I, V, E, M> {
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(RepresentativeVertex.class);
-
- @Override
- public final boolean addEdge(Edge<I, E> edge) {
- // Note that this is very expensive (deserializes all edges
- // in an addEdge() request).
- // Hopefully the user set all the edges in setEdges().
- for (Edge<I, E> currentEdge : getEdges()) {
- if (currentEdge.getTargetVertexId().equals(edge.getTargetVertexId())) {
- LOG.warn("addEdge: Vertex=" + getId() +
- ": already added an edge value for target vertex id " +
- edge.getTargetVertexId());
- return false;
- }
- }
- appendEdge(edge);
- return true;
- }
-
- @Override
- public final int removeEdges(I targetVertexId) {
- return removeFirstEdge(targetVertexId) ? 1 : 0;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/RepresentativeVertexBase.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/RepresentativeVertexBase.java b/giraph-core/src/main/java/org/apache/giraph/graph/RepresentativeVertexBase.java
deleted file mode 100644
index d0e4bfb..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/RepresentativeVertexBase.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.utils.ExtendedDataInput;
-import org.apache.giraph.utils.ExtendedDataOutput;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Common base class for representative vertices.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public abstract class RepresentativeVertexBase<
- I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends MutableVertex<I, V, E, M> implements Iterable<Edge<I, E>> {
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(RepresentativeVertex.class);
- /** Representative edge */
- private final Edge<I, E> representativeEdge = new Edge<I, E>();
- /** Serialized edges */
- private byte[] serializedEdges;
- /** Byte used in serializedEdges */
- private int serializedEdgesBytesUsed;
- /** Number of edges */
- private int edgeCount;
-
- /**
- * Append an edge to the serialized representation.
- *
- * @param edge Edge to append
- */
- protected void appendEdge(Edge<I, E> edge) {
- ExtendedDataOutput extendedDataOutput =
- getConf().createExtendedDataOutput(
- serializedEdges, serializedEdgesBytesUsed);
- try {
- edge.getTargetVertexId().write(extendedDataOutput);
- edge.getValue().write(extendedDataOutput);
- } catch (IOException e) {
- throw new IllegalStateException("addEdge: Failed to write to the " +
- "new byte array");
- }
- serializedEdges = extendedDataOutput.getByteArray();
- serializedEdgesBytesUsed = extendedDataOutput.getPos();
- ++edgeCount;
- }
-
- /**
- * Remove the first edge pointing to a target vertex.
- *
- * @param targetVertexId Target vertex id
- * @return True if one such edge was found and removed.
- */
- protected boolean removeFirstEdge(I targetVertexId) {
- // Note that this is very expensive (deserializes all edges
- // in an removedge() request).
- // Hopefully the user set all the edges correctly in setEdges().
- RepresentativeEdgeIterator iterator = new RepresentativeEdgeIterator();
- int foundStartOffset = 0;
- while (iterator.hasNext()) {
- Edge<I, E> edge = iterator.next();
- if (edge.getTargetVertexId().equals(targetVertexId)) {
- System.arraycopy(serializedEdges, iterator.extendedDataInput.getPos(),
- serializedEdges, foundStartOffset,
- serializedEdgesBytesUsed - iterator.extendedDataInput.getPos());
- serializedEdgesBytesUsed -=
- iterator.extendedDataInput.getPos() - foundStartOffset;
- --edgeCount;
- return true;
- }
- foundStartOffset = iterator.extendedDataInput.getPos();
- }
-
- return false;
- }
-
- /**
- * Remove all edges pointing to a target vertex.
- *
- * @param targetVertexId Target vertex id
- * @return The number of removed edges
- */
- protected int removeAllEdges(I targetVertexId) {
- // Note that this is very expensive (deserializes all edges
- // in an removedge() request).
- // Hopefully the user set all the edges correctly in setEdges().
- RepresentativeEdgeIterator iterator = new RepresentativeEdgeIterator();
- int removedCount = 0;
- List<Integer> foundStartOffsets = new LinkedList<Integer>();
- List<Integer> foundEndOffsets = new LinkedList<Integer>();
- int lastStartOffset = 0;
- while (iterator.hasNext()) {
- Edge<I, E> edge = iterator.next();
- if (edge.getTargetVertexId().equals(targetVertexId)) {
- foundStartOffsets.add(lastStartOffset);
- foundEndOffsets.add(iterator.extendedDataInput.getPos());
- ++removedCount;
- }
- lastStartOffset = iterator.extendedDataInput.getPos();
- }
- foundStartOffsets.add(serializedEdgesBytesUsed);
-
- Iterator<Integer> foundStartOffsetIter = foundStartOffsets.iterator();
- Integer foundStartOffset = foundStartOffsetIter.next();
- for (Integer foundEndOffset : foundEndOffsets) {
- Integer nextFoundStartOffset = foundStartOffsetIter.next();
- System.arraycopy(serializedEdges, foundEndOffset,
- serializedEdges, foundStartOffset,
- nextFoundStartOffset - foundEndOffset);
- serializedEdgesBytesUsed -= foundEndOffset - foundStartOffset;
- foundStartOffset = nextFoundStartOffset;
- }
-
- edgeCount -= removedCount;
- return removedCount;
- }
-
- @Override
- public final void initialize(I id, V value, Iterable<Edge<I, E>> edges) {
- // Make sure the initial values exist
- representativeEdge.setTargetVertexId(getConf().createVertexId());
- representativeEdge.setValue(getConf().createEdgeValue());
- super.initialize(id, value, edges);
- }
-
- @Override
- public final void initialize(I id, V value) {
- // Make sure the initial values exist
- representativeEdge.setTargetVertexId(getConf().createVertexId());
- representativeEdge.setValue(getConf().createEdgeValue());
- super.initialize(id, value);
- }
-
- /**
- * Iterator that uses the representative edge (only one iterator allowed
- * at a time)
- */
- private final class RepresentativeEdgeIterator implements
- Iterator<Edge<I, E>> {
- /** Input for processing the bytes */
- private final ExtendedDataInput extendedDataInput;
-
- /** Constructor. */
- RepresentativeEdgeIterator() {
- extendedDataInput = getConf().createExtendedDataInput(
- serializedEdges, 0, serializedEdgesBytesUsed);
- }
- @Override
- public boolean hasNext() {
- return serializedEdges != null && extendedDataInput.available() > 0;
- }
-
- @Override
- public Edge<I, E> next() {
- try {
- representativeEdge.getTargetVertexId().readFields(extendedDataInput);
- representativeEdge.getValue().readFields(extendedDataInput);
- } catch (IOException e) {
- throw new IllegalStateException("next: Failed on pos " +
- extendedDataInput.getPos() + " edge " + representativeEdge);
- }
- return representativeEdge;
- }
-
- @Override
- public void remove() {
- throw new IllegalAccessError("remove: Not supported");
- }
- }
-
- @Override
- public final Iterator<Edge<I, E>> iterator() {
- return new RepresentativeEdgeIterator();
- }
-
- @Override
- public final void setEdges(Iterable<Edge<I, E>> edges) {
- ExtendedDataOutput extendedOutputStream =
- getConf().createExtendedDataOutput();
- if (edges != null) {
- for (Edge<I, E> edge : edges) {
- try {
- edge.getTargetVertexId().write(extendedOutputStream);
- edge.getValue().write(extendedOutputStream);
- } catch (IOException e) {
- throw new IllegalStateException("setEdges: Failed to serialize " +
- edge);
- }
- ++edgeCount;
- }
- }
- serializedEdges = extendedOutputStream.getByteArray();
- serializedEdgesBytesUsed = extendedOutputStream.getPos();
- }
-
- @Override
- public final Iterable<Edge<I, E>> getEdges() {
- return this;
- }
-
- @Override
- public final int getNumEdges() {
- return edgeCount;
- }
-
- @Override
- public final void readFields(DataInput in) throws IOException {
- // Ensure these objects are present
- if (representativeEdge.getTargetVertexId() == null) {
- representativeEdge.setTargetVertexId(getConf().createVertexId());
- }
-
- if (representativeEdge.getValue() == null) {
- representativeEdge.setValue(getConf().createEdgeValue());
- }
-
- I vertexId = getId();
- if (vertexId == null) {
- vertexId = getConf().createVertexId();
- }
- vertexId.readFields(in);
-
- V vertexValue = getValue();
- if (vertexValue == null) {
- vertexValue = getConf().createVertexValue();
- }
- vertexValue.readFields(in);
-
- initialize(vertexId, vertexValue);
-
- serializedEdgesBytesUsed = in.readInt();
- // Only create a new buffer if the old one isn't big enough
- if (serializedEdges == null ||
- serializedEdgesBytesUsed > serializedEdges.length) {
- serializedEdges = new byte[serializedEdgesBytesUsed];
- }
- in.readFully(serializedEdges, 0, serializedEdgesBytesUsed);
- edgeCount = in.readInt();
-
- readHaltBoolean(in);
- }
-
- @Override
- public final void write(DataOutput out) throws IOException {
- getId().write(out);
- getValue().write(out);
-
- out.writeInt(serializedEdgesBytesUsed);
- out.write(serializedEdges, 0, serializedEdgesBytesUsed);
- out.writeInt(edgeCount);
-
- out.writeBoolean(isHalted());
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java
deleted file mode 100644
index 4843dd5..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import com.google.common.collect.Lists;
-import org.apache.giraph.utils.EdgeIterables;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Mutable vertex with no edge values.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <M> Message data
- */
-public abstract class SimpleMutableVertex<I extends WritableComparable,
- V extends Writable, M extends Writable> extends MutableVertex<I, V,
- NullWritable, M> {
- /**
- * Set the neighbors of this vertex.
- *
- * @param neighbors Iterable of destination vertex ids.
- */
- public abstract void setNeighbors(Iterable<I> neighbors);
-
- @Override
- public void setEdges(Iterable<Edge<I, NullWritable>> edges) {
- setNeighbors(EdgeIterables.getNeighbors(edges));
- }
-
- /**
- * Get a read-only view of the neighbors of this
- * vertex, i.e. the target vertices of its out-edges.
- *
- * @return the neighbors (sort order determined by subclass implementation).
- */
- public abstract Iterable<I> getNeighbors();
-
- @Override
- public Iterable<Edge<I, NullWritable>> getEdges() {
- return EdgeIterables.getEdges(getNeighbors());
- }
-
- @Override
- public NullWritable getEdgeValue(I targetVertexId) {
- return NullWritable.get();
- }
-
- /**
- * Add an edge for this vertex (happens immediately)
- *
- * @param targetVertexId target vertex
- * @return Return true if succeeded, false otherwise
- */
- public abstract boolean addEdge(I targetVertexId);
-
- @Override
- public boolean addEdge(Edge<I, NullWritable> edge) {
- return addEdge(edge.getTargetVertexId());
- }
-
- /**
- * Request to add an edge of a vertex in the graph
- * (processed just prior to the next superstep)
- *
- * @param sourceVertexId Source vertex id of edge
- */
- public void addEdgeRequest(I sourceVertexId) throws IOException {
- getGraphState().getWorkerClientRequestProcessor().
- addEdgeRequest(sourceVertexId, new Edge<I,
- NullWritable>(sourceVertexId, NullWritable.get()));
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- I vertexId = (I) getConf().createVertexId();
- vertexId.readFields(in);
- V vertexValue = (V) getConf().createVertexValue();
- vertexValue.readFields(in);
-
- int numEdges = in.readInt();
- List<Edge<I, NullWritable>> edges =
- Lists.newArrayListWithCapacity(numEdges);
- for (int i = 0; i < numEdges; ++i) {
- I targetVertexId = (I) getConf().createVertexId();
- targetVertexId.readFields(in);
- edges.add(new Edge<I, NullWritable>(targetVertexId, NullWritable.get()));
- }
-
- initialize(vertexId, vertexValue, edges);
-
- readHaltBoolean(in);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- getId().write(out);
- getValue().write(out);
-
- out.writeInt(getNumEdges());
- for (I neighbor : getNeighbors()) {
- neighbor.write(out);
- }
-
- out.writeBoolean(isHalted());
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/SimpleVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/SimpleVertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/SimpleVertex.java
deleted file mode 100644
index 0d56d95..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/SimpleVertex.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import com.google.common.collect.Lists;
-import org.apache.giraph.utils.EdgeIterables;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Vertex with no edge values.
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <M> Message data
- */
-public abstract class SimpleVertex<I extends WritableComparable,
- V extends Writable, M extends Writable> extends Vertex<I, V,
- NullWritable, M> {
- /**
- * Set the neighbors of this vertex.
- *
- * @param neighbors Iterable of destination vertex ids.
- */
- public abstract void setNeighbors(Iterable<I> neighbors);
-
- @Override
- public void setEdges(Iterable<Edge<I, NullWritable>> edges) {
- setNeighbors(EdgeIterables.getNeighbors(edges));
- }
-
- /**
- * Get a read-only view of the neighbors of this
- * vertex, i.e. the target vertices of its out-edges.
- *
- * @return the neighbors (sort order determined by subclass implementation).
- */
- public abstract Iterable<I> getNeighbors();
-
- @Override
- public Iterable<Edge<I, NullWritable>> getEdges() {
- return EdgeIterables.getEdges(getNeighbors());
- }
-
- @Override
- public NullWritable getEdgeValue(I targetVertexId) {
- return NullWritable.get();
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- I vertexId = getConf().createVertexId();
- vertexId.readFields(in);
- V vertexValue = getConf().createVertexValue();
- vertexValue.readFields(in);
-
- int numEdges = in.readInt();
- List<I> neighbors = Lists.newArrayListWithCapacity(numEdges);
- for (int i = 0; i < numEdges; ++i) {
- I targetVertexId = getConf().createVertexId();
- targetVertexId.readFields(in);
- neighbors.add(targetVertexId);
- }
-
- initialize(vertexId, vertexValue);
- setNeighbors(neighbors);
-
- readHaltBoolean(in);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- getId().write(out);
- getValue().write(out);
-
- out.writeInt(getNumEdges());
- for (I neighbor : getNeighbors()) {
- neighbor.write(out);
- }
-
- out.writeBoolean(isHalted());
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java b/giraph-core/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java
deleted file mode 100644
index abdba44..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import java.io.IOException;
-import java.util.Map.Entry;
-
-import com.google.common.base.Charsets;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-
-/**
- * Default implementation of {@link AggregatorWriter}. Each line consists of
- * text and contains the aggregator name, the aggregator value and the
- * aggregator class.
- */
-public class TextAggregatorWriter implements AggregatorWriter {
- /** The filename of the outputfile */
- public static final String FILENAME =
- "giraph.textAggregatorWriter.filename";
- /** Signal for "never write" frequency */
- public static final int NEVER = 0;
- /** Signal for "write only the final values" frequency */
- public static final int AT_THE_END = -1;
- /** Signal for "write values in every superstep" frequency */
- public static final int ALWAYS = -1;
- /** The frequency of writing:
- * - NEVER: never write, files aren't created at all
- * - AT_THE_END: aggregators are written only when the computation is over
- * - int: i.e. 1 is every superstep, 2 every two supersteps and so on
- */
- public static final String FREQUENCY =
- "giraph.textAggregatorWriter.frequency";
- /** Default filename for dumping aggregator values */
- private static final String DEFAULT_FILENAME = "aggregatorValues";
- /** Handle to the outputfile */
- protected FSDataOutputStream output;
- /** Write every "frequency" supersteps */
- private int frequency;
-
- @Override
- @SuppressWarnings("rawtypes")
- public void initialize(Context context, long attempt) throws IOException {
- Configuration conf = context.getConfiguration();
- frequency = conf.getInt(FREQUENCY, NEVER);
- String filename = conf.get(FILENAME, DEFAULT_FILENAME);
- if (frequency != NEVER) {
- Path p = new Path(filename + "_" + attempt);
- FileSystem fs = FileSystem.get(conf);
- if (fs.exists(p)) {
- throw new RuntimeException("aggregatorWriter file already" +
- " exists: " + p.getName());
- }
- output = fs.create(p);
- }
- }
-
- @Override
- public void writeAggregator(
- Iterable<Entry<String, Writable>> aggregatorMap,
- long superstep) throws IOException {
- if (shouldWrite(superstep)) {
- for (Entry<String, Writable> entry : aggregatorMap) {
- byte[] bytes = aggregatorToString(entry.getKey(), entry.getValue(),
- superstep).getBytes(Charsets.UTF_8);
- output.write(bytes, 0, bytes.length);
- }
- output.flush();
- }
- }
-
- /**
- * Implements the way an aggregator is converted into a String.
- * Override this if you want to implement your own text format.
- *
- * @param aggregatorName Name of the aggregator
- * @param value Value of aggregator
- * @param superstep Current superstep
- * @return The String representation for the aggregator
- */
- protected String aggregatorToString(String aggregatorName,
- Writable value,
- long superstep) {
- return new StringBuilder("superstep=").append(superstep).append("\t")
- .append(aggregatorName).append("=").append(value).append("\n")
- .toString();
- }
-
- /**
- * Should write this superstep?
- *
- * @param superstep Superstep to check
- * @return True if should write, false otherwise
- */
- private boolean shouldWrite(long superstep) {
- return (frequency == AT_THE_END && superstep == LAST_SUPERSTEP) ||
- (frequency != NEVER && superstep % frequency == 0);
- }
-
- @Override
- public void close() throws IOException {
- if (output != null) {
- output.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
deleted file mode 100644
index 6db4735..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.partition.PartitionOwner;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Basic interface for writing a BSP application for computation.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public abstract class Vertex<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- implements WorkerAggregatorUsage, Writable,
- ImmutableClassesGiraphConfigurable<I, V, E, M> {
- /** Vertex id. */
- private I id;
- /** Vertex value. */
- private V value;
- /** If true, do not do anymore computation on this vertex. */
- private boolean halt;
- /** Global graph state **/
- private GraphState<I, V, E, M> graphState;
- /** Configuration */
- private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
-
- /**
- * This method must be called after instantiation of a vertex
- * with ImmutableClassesGiraphConfiguration
- * unless deserialization from readFields() is
- * called.
- *
- * @param id Will be the vertex id
- * @param value Will be the vertex value
- * @param edges Iterable of edges
- */
- public void initialize(I id, V value, Iterable<Edge<I, E>> edges) {
- this.id = id;
- this.value = value;
- setEdges(edges);
- }
-
- /**
- * This method only sets id and value. Can be used by Vertex
- * implementations in readFields().
- *
- * @param id Vertex id
- * @param value Vertex value
- */
- public void initialize(I id, V value) {
- this.id = id;
- this.value = value;
- setEdges(Collections.<Edge<I, E>>emptyList());
- }
-
- /**
- * Set the outgoing edges for this vertex.
- *
- * @param edges Iterable of edges
- */
- public abstract void setEdges(Iterable<Edge<I, E>> edges);
-
- /**
- * Must be defined by user to do computation on a single Vertex.
- *
- * @param messages Messages that were sent to this vertex in the previous
- * superstep. Each message is only guaranteed to have
- * a life expectancy as long as next() is not called.
- * @throws IOException
- */
- public abstract void compute(Iterable<M> messages) throws IOException;
-
- /**
- * Retrieves the current superstep.
- *
- * @return Current superstep
- */
- public long getSuperstep() {
- return getGraphState().getSuperstep();
- }
-
- /**
- * Get the vertex id.
- *
- * @return My vertex id.
- */
- public I getId() {
- return id;
- }
-
- /**
- * Get the vertex value (data stored with vertex)
- *
- * @return Vertex value
- */
- public V getValue() {
- return value;
- }
-
- /**
- * Set the vertex data (immediately visible in the computation)
- *
- * @param value Vertex data to be set
- */
- public void setValue(V value) {
- this.value = value;
- }
-
- /**
- * Get the total (all workers) number of vertices that
- * existed in the previous superstep.
- *
- * @return Total number of vertices (-1 if first superstep)
- */
- public long getTotalNumVertices() {
- return getGraphState().getTotalNumVertices();
- }
-
- /**
- * Get the total (all workers) number of edges that
- * existed in the previous superstep.
- *
- * @return Total number of edges (-1 if first superstep)
- */
- public long getTotalNumEdges() {
- return getGraphState().getTotalNumEdges();
- }
-
- /**
- * Get a read-only view of the out-edges of this vertex.
- *
- * @return the out edges (sort order determined by subclass implementation).
- */
- public abstract Iterable<Edge<I, E>> getEdges();
-
- /**
- * Does an edge with the target vertex id exist?
- *
- * @param targetVertexId Target vertex id to check
- * @return true if there is an edge to the target
- */
- public boolean hasEdge(I targetVertexId) {
- for (Edge<I, E> edge : getEdges()) {
- if (edge.getTargetVertexId().equals(targetVertexId)) {
- return true;
- }
- }
- return false;
- }
-
- /**
- * Get the edge value associated with a target vertex id.
- *
- * @param targetVertexId Target vertex id to check
- *
- * @return the value of the edge to targetVertexId (or null if there
- * is no edge to it)
- */
- public E getEdgeValue(I targetVertexId) {
- for (Edge<I, E> edge : getEdges()) {
- if (edge.getTargetVertexId().equals(targetVertexId)) {
- return edge.getValue();
- }
- }
- return null;
- }
-
- /**
- * Get the number of outgoing edges on this vertex.
- *
- * @return the total number of outbound edges from this vertex
- */
- public int getNumEdges() {
- return Iterables.size(getEdges());
- }
-
- /**
- * Send a message to a vertex id. The message should not be mutated after
- * this method returns or else undefined results could occur.
- *
- * @param id Vertex id to send the message to
- * @param message Message data to send. Note that after the message is sent,
- * the user should not modify the object.
- */
- public void sendMessage(I id, M message) {
- if (message == null) {
- throw new IllegalArgumentException(
- "sendMessage: Cannot send null message to " + id);
- }
- if (graphState.getWorkerClientRequestProcessor().
- sendMessageRequest(id, message)) {
- graphState.getGraphMapper().notifySentMessages();
- }
- }
-
- /**
- * Lookup WorkerInfo for myself.
- *
- * @return WorkerInfo about worker holding this Vertex.
- */
- public WorkerInfo getMyWorkerInfo() {
- return getVertexWorkerInfo(id);
- }
-
- /**
- * Lookup WorkerInfo for a Vertex.
- *
- * @param vertexId VertexId to lookup
- * @return WorkerInfo about worker holding this Vertex.
- */
- public WorkerInfo getVertexWorkerInfo(I vertexId) {
- return getVertexPartitionOwner(vertexId).getWorkerInfo();
- }
-
- /**
- * Lookup PartitionOwner for a Vertex
- *
- * @param vertexId id of Vertex to look up.
- * @return PartitionOwner holding Vertex
- */
- private PartitionOwner getVertexPartitionOwner(I vertexId) {
- return getGraphState().getWorkerClientRequestProcessor().
- getVertexPartitionOwner(vertexId);
- }
-
- /**
- * Send a message to all edges.
- *
- * @param message Message sent to all edges.
- */
- public void sendMessageToAllEdges(M message) {
- for (Edge<I, E> edge : getEdges()) {
- sendMessage(edge.getTargetVertexId(), message);
- }
- }
-
- /**
- * After this is called, the compute() code will no longer be called for
- * this vertex unless a message is sent to it. Then the compute() code
- * will be called once again until this function is called. The
- * application finishes only when all vertices vote to halt.
- */
- public void voteToHalt() {
- halt = true;
- }
-
- /**
- * Re-activate vertex if halted.
- */
- public void wakeUp() {
- halt = false;
- }
-
- /**
- * Is this vertex done?
- *
- * @return True if halted, false otherwise.
- */
- public boolean isHalted() {
- return halt;
- }
-
- /**
- * Get the graph state for all workers.
- *
- * @return Graph state for all workers
- */
- GraphState<I, V, E, M> getGraphState() {
- return graphState;
- }
-
- /**
- * Set the graph state for all workers
- *
- * @param graphState Graph state for all workers
- */
- void setGraphState(GraphState<I, V, E, M> graphState) {
- this.graphState = graphState;
- }
-
- /**
- * Get the mapper context
- *
- * @return Mapper context
- */
- public Mapper.Context getContext() {
- return getGraphState().getContext();
- }
-
- /**
- * Get the worker context
- *
- * @return WorkerContext context
- */
- public WorkerContext getWorkerContext() {
- return getGraphState().getGraphMapper().getWorkerContext();
- }
-
- @Override
- public <A extends Writable> void aggregate(String name, A value) {
- getGraphState().getWorkerAggregatorUsage().
- aggregate(name, value);
- }
-
- @Override
- public <A extends Writable> A getAggregatedValue(String name) {
- return getGraphState().getWorkerAggregatorUsage().
- <A>getAggregatedValue(name);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- I vertexId = (I) getConf().createVertexId();
- vertexId.readFields(in);
- V vertexValue = (V) getConf().createVertexValue();
- vertexValue.readFields(in);
-
- int numEdges = in.readInt();
- List<Edge<I, E>> edges = Lists.newArrayListWithCapacity(numEdges);
- for (int i = 0; i < numEdges; ++i) {
- I targetVertexId = (I) getConf().createVertexId();
- targetVertexId.readFields(in);
- E edgeValue = (E) getConf().createEdgeValue();
- edgeValue.readFields(in);
- edges.add(new Edge<I, E>(targetVertexId, edgeValue));
- }
-
- initialize(vertexId, vertexValue, edges);
-
- readHaltBoolean(in);
- }
-
- /**
- * Helper method for subclasses which implement their own readFields() to use.
- *
- * @param in DataInput to read from.
- * @throws IOException If anything goes wrong during read.
- */
- protected void readHaltBoolean(DataInput in) throws IOException {
- halt = in.readBoolean();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- getId().write(out);
- getValue().write(out);
-
- out.writeInt(getNumEdges());
- for (Edge<I, E> edge : getEdges()) {
- edge.getTargetVertexId().write(out);
- edge.getValue().write(out);
- }
-
- out.writeBoolean(halt);
- }
-
- @Override
- public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
- return conf;
- }
-
- @Override
- public void setConf(ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
- this.conf = conf;
- }
-
- @Override
- public String toString() {
- return "Vertex(id=" + getId() + ",value=" + getValue() +
- ",#edges=" + getNumEdges() + ")";
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java
index 957cf82..b529202 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java
@@ -20,6 +20,7 @@ package org.apache.giraph.graph;
import java.util.List;
+import org.apache.giraph.vertex.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputFormat.java
deleted file mode 100644
index 9824b69..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputFormat.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Use this to load data for a BSP application. Note that the InputSplit must
- * also implement Writable. The InputSplits will determine the partitioning of
- * vertices across the mappers, so keep that in consideration when implementing
- * getSplits().
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public abstract class VertexInputFormat<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- implements GiraphInputFormat {
- /**
- * Logically split the vertices for a graph processing application.
- *
- * Each {@link InputSplit} is then assigned to a worker for processing.
- *
- * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
- * input files are not physically split into chunks. For e.g. a split could
- * be <i><input-file-path, start, offset></i> tuple. The InputFormat
- * also creates the {@link VertexReader} to read the {@link InputSplit}.
- *
- * Also, the number of workers is a hint given to the developer to try to
- * intelligently determine how many splits to create (if this is
- * adjustable) at runtime.
- *
- * @param context Context of the job
- * @param numWorkers Number of workers used for this job
- * @return an array of {@link InputSplit}s for the job.
- */
- @Override
- public abstract List<InputSplit> getSplits(
- JobContext context, int numWorkers)
- throws IOException, InterruptedException;
-
- /**
- * Create a vertex reader for a given split. The framework will call
- * {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
- * the split is used.
- *
- * @param split the split to be read
- * @param context the information about the task
- * @return a new record reader
- * @throws IOException
- * @throws InterruptedException
- */
- public abstract VertexReader<I, V, E, M> createVertexReader(
- InputSplit split,
- TaskAttemptContext context) throws IOException;
-}