You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/01/04 21:52:39 UTC

[16/23] GIRAPH-409: Refactor / cleanups (nitay)

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java
deleted file mode 100644
index ef1e2ff..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.bsp.SuperstepState;
-import org.apache.giraph.comm.MasterClient;
-import org.apache.giraph.comm.aggregators.AggregatorUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.Progressable;
-import org.apache.log4j.Logger;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.AbstractMap;
-import java.util.Map;
-
-/** Handler for aggregators on master */
-public class MasterAggregatorHandler implements MasterAggregatorUsage,
-    Writable {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(MasterAggregatorHandler.class);
-  /**
-   * Map of aggregators.
-   * This map is used to store final aggregated values received from worker
-   * owners, and also to read and write values provided during master.compute.
-   */
-  private final Map<String, AggregatorWrapper<Writable>> aggregatorMap =
-      Maps.newHashMap();
-  /** Aggregator writer */
-  private final AggregatorWriter aggregatorWriter;
-  /** Progressable used to report progress */
-  private final Progressable progressable;
-
-  /**
-   * Constructor
-   *
-   * @param conf         Giraph configuration
-   * @param progressable Progressable used for reporting progress
-   */
-  public MasterAggregatorHandler(
-      ImmutableClassesGiraphConfiguration<?, ?, ?, ?> conf,
-      Progressable progressable) {
-    this.progressable = progressable;
-    aggregatorWriter = conf.createAggregatorWriter();
-  }
-
-  @Override
-  public <A extends Writable> A getAggregatedValue(String name) {
-    AggregatorWrapper<? extends Writable> aggregator = aggregatorMap.get(name);
-    if (aggregator == null) {
-      return null;
-    } else {
-      return (A) aggregator.getPreviousAggregatedValue();
-    }
-  }
-
-  @Override
-  public <A extends Writable> void setAggregatedValue(String name, A value) {
-    AggregatorWrapper<? extends Writable> aggregator = aggregatorMap.get(name);
-    if (aggregator == null) {
-      throw new IllegalStateException(
-          "setAggregatedValue: Tried to set value of aggregator which wasn't" +
-              " registered " + name);
-    }
-    ((AggregatorWrapper<A>) aggregator).setCurrentAggregatedValue(value);
-  }
-
-  @Override
-  public <A extends Writable> boolean registerAggregator(String name,
-      Class<? extends Aggregator<A>> aggregatorClass) throws
-      InstantiationException, IllegalAccessException {
-    checkAggregatorName(name);
-    return registerAggregator(name, aggregatorClass, false) != null;
-  }
-
-  @Override
-  public <A extends Writable> boolean registerPersistentAggregator(String name,
-      Class<? extends Aggregator<A>> aggregatorClass) throws
-      InstantiationException, IllegalAccessException {
-    checkAggregatorName(name);
-    return registerAggregator(name, aggregatorClass, true) != null;
-  }
-
-  /**
-   * Make sure user doesn't use AggregatorUtils.SPECIAL_COUNT_AGGREGATOR as
-   * the name of aggregator. Throw an exception if he tries to use it.
-   *
-   * @param name Name of the aggregator to check.
-   */
-  private void checkAggregatorName(String name) {
-    if (name.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
-      throw new IllegalStateException("checkAggregatorName: " +
-          AggregatorUtils.SPECIAL_COUNT_AGGREGATOR +
-          " is not allowed for the name of aggregator");
-    }
-  }
-
-  /**
-   * Helper function for registering aggregators.
-   *
-   * @param name            Name of the aggregator
-   * @param aggregatorClass Class of the aggregator
-   * @param persistent      Whether aggregator is persistent or not
-   * @param <A>             Aggregated value type
-   * @return Newly registered aggregator or aggregator which was previously
-   *         created with selected name, if any
-   */
-  private <A extends Writable> AggregatorWrapper<A> registerAggregator
-  (String name, Class<? extends Aggregator<A>> aggregatorClass,
-      boolean persistent) throws InstantiationException,
-      IllegalAccessException {
-    AggregatorWrapper<A> aggregatorWrapper =
-        (AggregatorWrapper<A>) aggregatorMap.get(name);
-    if (aggregatorWrapper == null) {
-      aggregatorWrapper =
-          new AggregatorWrapper<A>(aggregatorClass, persistent);
-      aggregatorMap.put(name, (AggregatorWrapper<Writable>) aggregatorWrapper);
-    }
-    return aggregatorWrapper;
-  }
-
-  /**
-   * Prepare aggregators for current superstep
-   *
-   * @param masterClient IPC client on master
-   */
-  public void prepareSuperstep(MasterClient masterClient) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("prepareSuperstep: Start preapring aggregators");
-    }
-    // prepare aggregators for master compute
-    for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
-      if (aggregator.isPersistent()) {
-        aggregator.aggregateCurrent(aggregator.getPreviousAggregatedValue());
-      }
-      aggregator.setPreviousAggregatedValue(
-          aggregator.getCurrentAggregatedValue());
-      aggregator.resetCurrentAggregator();
-      progressable.progress();
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("prepareSuperstep: Aggregators prepared");
-    }
-  }
-
-  /**
-   * Finalize aggregators for current superstep and share them with workers
-   *
-   * @param masterClient IPC client on master
-   */
-  public void finishSuperstep(MasterClient masterClient) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("finishSuperstep: Start finishing aggregators");
-    }
-    for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
-      if (aggregator.isChanged()) {
-        // if master compute changed the value, use the one he chose
-        aggregator.setPreviousAggregatedValue(
-            aggregator.getCurrentAggregatedValue());
-        // reset aggregator for the next superstep
-        aggregator.resetCurrentAggregator();
-      }
-      progressable.progress();
-    }
-
-    // send aggregators to their owners
-    // TODO: if aggregator owner and it's value didn't change,
-    //       we don't need to resend it
-    try {
-      for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
-          aggregatorMap.entrySet()) {
-        masterClient.sendAggregator(entry.getKey(),
-            entry.getValue().getAggregatorClass(),
-            entry.getValue().getPreviousAggregatedValue());
-        progressable.progress();
-      }
-      masterClient.finishSendingAggregatedValues();
-    } catch (IOException e) {
-      throw new IllegalStateException("finishSuperstep: " +
-          "IOException occurred while sending aggregators", e);
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("finishSuperstep: Aggregators finished");
-    }
-  }
-
-  /**
-   * Accept aggregated values sent by worker. Every aggregator will be sent
-   * only once, by its owner.
-   * We don't need to count the number of these requests because global
-   * superstep barrier will happen after workers ensure all requests of this
-   * type have been received and processed by master.
-   *
-   * @param aggregatedValuesInput Input in which aggregated values are
-   *                              written in the following format:
-   *                              number_of_aggregators
-   *                              name_1  value_1
-   *                              name_2  value_2
-   *                              ...
-   * @throws IOException
-   */
-  public void acceptAggregatedValues(
-      DataInput aggregatedValuesInput) throws IOException {
-    int numAggregators = aggregatedValuesInput.readInt();
-    for (int i = 0; i < numAggregators; i++) {
-      String aggregatorName = aggregatedValuesInput.readUTF();
-      AggregatorWrapper<Writable> aggregator =
-          aggregatorMap.get(aggregatorName);
-      if (aggregator == null) {
-        throw new IllegalStateException(
-            "acceptAggregatedValues: " +
-                "Master received aggregator which isn't registered: " +
-                aggregatorName);
-      }
-      Writable aggregatorValue = aggregator.createInitialValue();
-      aggregatorValue.readFields(aggregatedValuesInput);
-      aggregator.setCurrentAggregatedValue(aggregatorValue);
-      progressable.progress();
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("acceptAggregatedValues: Accepted one set with " +
-          numAggregators + " aggregated values");
-    }
-  }
-
-  /**
-   * Write aggregators to {@link AggregatorWriter}
-   *
-   * @param superstep      Superstep which just finished
-   * @param superstepState State of the superstep which just finished
-   */
-  public void writeAggregators(long superstep, SuperstepState superstepState) {
-    try {
-      Iterable<Map.Entry<String, Writable>> iter =
-          Iterables.transform(
-              aggregatorMap.entrySet(),
-              new Function<Map.Entry<String, AggregatorWrapper<Writable>>,
-                  Map.Entry<String, Writable>>() {
-                @Override
-                public Map.Entry<String, Writable> apply(
-                    Map.Entry<String, AggregatorWrapper<Writable>> entry) {
-                  progressable.progress();
-                  return new AbstractMap.SimpleEntry<String,
-                      Writable>(entry.getKey(),
-                      entry.getValue().getPreviousAggregatedValue());
-                }
-              });
-      aggregatorWriter.writeAggregator(iter,
-          (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE) ?
-              AggregatorWriter.LAST_SUPERSTEP : superstep);
-    } catch (IOException e) {
-      throw new IllegalStateException(
-          "coordinateSuperstep: IOException while " +
-              "writing aggregators data", e);
-    }
-  }
-
-  /**
-   * Initialize {@link AggregatorWriter}
-   *
-   * @param service BspService
-   */
-  public void initialize(BspService service) {
-    try {
-      aggregatorWriter.initialize(service.getContext(),
-          service.getApplicationAttempt());
-    } catch (IOException e) {
-      throw new IllegalStateException("initialize: " +
-          "Couldn't initialize aggregatorWriter", e);
-    }
-  }
-
-  /**
-   * Close {@link AggregatorWriter}
-   *
-   * @throws IOException
-   */
-  public void close() throws IOException {
-    aggregatorWriter.close();
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(aggregatorMap.size());
-    for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
-        aggregatorMap.entrySet()) {
-      out.writeUTF(entry.getKey());
-      out.writeUTF(entry.getValue().getAggregatorClass().getName());
-      out.writeBoolean(entry.getValue().isPersistent());
-      entry.getValue().getPreviousAggregatedValue().write(out);
-      progressable.progress();
-    }
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    aggregatorMap.clear();
-    int numAggregators = in.readInt();
-    try {
-      for (int i = 0; i < numAggregators; i++) {
-        String aggregatorName = in.readUTF();
-        String aggregatorClassName = in.readUTF();
-        boolean isPersistent = in.readBoolean();
-        AggregatorWrapper<Writable> aggregator = registerAggregator(
-            aggregatorName,
-            AggregatorUtils.getAggregatorClass(aggregatorClassName),
-            isPersistent);
-        Writable value = aggregator.createInitialValue();
-        value.readFields(in);
-        aggregator.setPreviousAggregatedValue(value);
-        progressable.progress();
-      }
-    } catch (InstantiationException e) {
-      throw new IllegalStateException("readFields: " +
-          "InstantiationException occurred", e);
-    } catch (IllegalAccessException e) {
-      throw new IllegalStateException("readFields: " +
-          "IllegalAccessException occurred", e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/MasterAggregatorUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/MasterAggregatorUsage.java b/giraph-core/src/main/java/org/apache/giraph/graph/MasterAggregatorUsage.java
deleted file mode 100644
index 6e6571b..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/MasterAggregatorUsage.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-
-/**
- * Master compute can access and change aggregators through this interface
- */
-public interface MasterAggregatorUsage {
-  /**
-   * Register an aggregator in preSuperstep() and/or preApplication(). This
-   * aggregator will have its value reset at the end of each super step.
-   *
-   * @param name of aggregator
-   * @param aggregatorClass Class type of the aggregator
-   * @param <A> Aggregator type
-   * @return True iff aggregator wasn't already registered
-   */
-  <A extends Writable> boolean registerAggregator(String name,
-      Class<? extends Aggregator<A>> aggregatorClass) throws
-      InstantiationException, IllegalAccessException;
-
-  /**
-   * Register persistent aggregator in preSuperstep() and/or
-   * preApplication(). This aggregator will not reset value at the end of
-   * super step.
-   *
-   * @param name of aggregator
-   * @param aggregatorClass Class type of the aggregator
-   * @param <A> Aggregator type
-   * @return True iff aggregator wasn't already registered
-   */
-  <A extends Writable> boolean registerPersistentAggregator(String name,
-      Class<? extends Aggregator<A>> aggregatorClass) throws
-      InstantiationException, IllegalAccessException;
-
-  /**
-   * Get value of an aggregator.
-   *
-   * @param name Name of aggregator
-   * @param <A> Aggregated value
-   * @return Value of the aggregator
-   */
-  <A extends Writable> A getAggregatedValue(String name);
-
-  /**
-   * Sets value of an aggregator.
-   *
-   * @param name Name of aggregator
-   * @param value Value to set
-   * @param <A> Aggregated value
-   */
-  <A extends Writable> void setAggregatedValue(String name, A value);
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/MasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/graph/MasterCompute.java
deleted file mode 100644
index 4641621..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/MasterCompute.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Mapper;
-
-/**
- * Interface for defining a master vertex that can perform centralized
- * computation between supersteps. This class will be instantiated on the
- * master node and will run every superstep before the workers do.
- *
- * Communication with the workers should be performed via aggregators. The
- * values of the aggregators are broadcast to the workers before
- * vertex.compute() is called and collected by the master before
- * master.compute() is called. This means aggregator values used by the workers
- * are consistent with aggregator values from the master from the same
- * superstep and aggregator used by the master are consistent with aggregator
- * values from the workers from the previous superstep. Note that the master
- * has to register its own aggregators (it does not call {@link WorkerContext}
- * functions), but it uses all aggregators by default, so useAggregator does
- * not have to be called.
- */
-@SuppressWarnings("rawtypes")
-public abstract class MasterCompute implements MasterAggregatorUsage, Writable,
-    ImmutableClassesGiraphConfigurable {
-  /** If true, do not do anymore computation on this vertex. */
-  private boolean halt = false;
-  /** Global graph state **/
-  private GraphState graphState;
-  /** Configuration */
-  private ImmutableClassesGiraphConfiguration conf;
-
-  /**
-   * Must be defined by user to specify what the master has to do.
-   */
-  public abstract void compute();
-
-  /**
-   * Initialize the MasterCompute class, this is the place to register
-   * aggregators.
-   */
-  public abstract void initialize() throws InstantiationException,
-    IllegalAccessException;
-
-  /**
-   * Retrieves the current superstep.
-   *
-   * @return Current superstep
-   */
-  public long getSuperstep() {
-    return getGraphState().getSuperstep();
-  }
-
-  /**
-   * Get the total (all workers) number of vertices that
-   * existed in the previous superstep.
-   *
-   * @return Total number of vertices (-1 if first superstep)
-   */
-  public long getTotalNumVertices() {
-    return getGraphState().getTotalNumVertices();
-  }
-
-  /**
-   * Get the total (all workers) number of edges that
-   * existed in the previous superstep.
-   *
-   * @return Total number of edges (-1 if first superstep)
-   */
-  public long getTotalNumEdges() {
-    return getGraphState().getTotalNumEdges();
-  }
-
-  /**
-   * After this is called, the computation will stop, even if there are
-   * still messages in the system or vertices that have not voted to halt.
-   */
-  public void haltComputation() {
-    halt = true;
-  }
-
-  /**
-   * Has the master halted?
-   *
-   * @return True if halted, false otherwise.
-   */
-  public boolean isHalted() {
-    return halt;
-  }
-
-  /**
-   * Get the graph state for all workers.
-   *
-   * @return Graph state for all workers
-   */
-  GraphState getGraphState() {
-    return graphState;
-  }
-
-  /**
-   * Set the graph state for all workers
-   *
-   * @param graphState Graph state for all workers
-   */
-  void setGraphState(GraphState graphState) {
-    this.graphState = graphState;
-  }
-
-  /**
-   * Get the mapper context
-   *
-   * @return Mapper context
-   */
-  public Mapper.Context getContext() {
-    return getGraphState().getContext();
-  }
-
-  @Override
-  public final <A extends Writable> boolean registerAggregator(
-    String name, Class<? extends Aggregator<A>> aggregatorClass)
-    throws InstantiationException, IllegalAccessException {
-    return getGraphState().getGraphMapper().getMasterAggregatorUsage().
-        registerAggregator(name, aggregatorClass);
-  }
-
-  @Override
-  public final <A extends Writable> boolean registerPersistentAggregator(
-      String name,
-      Class<? extends Aggregator<A>> aggregatorClass) throws
-      InstantiationException, IllegalAccessException {
-    return getGraphState().getGraphMapper().getMasterAggregatorUsage().
-        registerPersistentAggregator(name, aggregatorClass);
-  }
-
-  @Override
-  public <A extends Writable> A getAggregatedValue(String name) {
-    return getGraphState().getGraphMapper().getMasterAggregatorUsage().
-        <A>getAggregatedValue(name);
-  }
-
-  @Override
-  public <A extends Writable> void setAggregatedValue(String name, A value) {
-    getGraphState().getGraphMapper().getMasterAggregatorUsage().
-        setAggregatedValue(name, value);
-  }
-
-  @Override
-  public ImmutableClassesGiraphConfiguration getConf() {
-    return conf;
-  }
-
-  @Override
-  public void setConf(ImmutableClassesGiraphConfiguration conf) {
-    this.conf = conf;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/MasterInfo.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/MasterInfo.java b/giraph-core/src/main/java/org/apache/giraph/graph/MasterInfo.java
deleted file mode 100644
index 50ad6aa..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/MasterInfo.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-/**
- * Information about the master that is sent to other workers.
- */
-public class MasterInfo extends TaskInfo {
-  /**
-   * Constructor
-   */
-  public MasterInfo() {
-  }
-
-  @Override
-  public String toString() {
-    return "Master(" + super.toString() + ")";
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/MasterThread.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/MasterThread.java b/giraph-core/src/main/java/org/apache/giraph/graph/MasterThread.java
deleted file mode 100644
index e27de42..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/MasterThread.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.bsp.ApplicationState;
-import org.apache.giraph.bsp.CentralizedServiceMaster;
-import org.apache.giraph.bsp.SuperstepState;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.counters.GiraphTimers;
-import org.apache.giraph.metrics.GiraphMetrics;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.log4j.Logger;
-
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
-/**
- * Master thread that will coordinate the activities of the tasks.  It runs
- * on all task processes, however, will only execute its algorithm if it knows
- * it is the "leader" from ZooKeeper.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public class MasterThread<I extends WritableComparable, V extends Writable,
-    E extends Writable, M extends Writable> extends Thread {
-  /** Counter group name for the Giraph timers */
-  public static final String GIRAPH_TIMERS_COUNTER_GROUP_NAME = "Giraph Timers";
-  /** Class logger */
-  private static final Logger LOG = Logger.getLogger(MasterThread.class);
-  /** Reference to shared BspService */
-  private CentralizedServiceMaster<I, V, E, M> bspServiceMaster = null;
-  /** Context (for counters) */
-  private final Context context;
-  /** Use superstep counters? */
-  private final boolean superstepCounterOn;
-  /** Setup seconds */
-  private double setupSecs = 0d;
-  /** Superstep timer (in seconds) map */
-  private final Map<Long, Double> superstepSecsMap =
-      new TreeMap<Long, Double>();
-
-  /**
-   * Constructor.
-   *
-   * @param bspServiceMaster Master that already exists and setup() has
-   *        been called.
-   * @param context Context from the Mapper.
-   */
-  MasterThread(CentralizedServiceMaster<I, V, E, M> bspServiceMaster,
-      Context context) {
-    super(MasterThread.class.getName());
-    this.bspServiceMaster = bspServiceMaster;
-    this.context = context;
-    GiraphTimers.init(context);
-    superstepCounterOn = context.getConfiguration().getBoolean(
-        GiraphConstants.USE_SUPERSTEP_COUNTERS,
-        GiraphConstants.USE_SUPERSTEP_COUNTERS_DEFAULT);
-  }
-
-  /**
-   * The master algorithm.  The algorithm should be able to withstand
-   * failures and resume as necessary since the master may switch during a
-   * job.
-   */
-  @Override
-  public void run() {
-    // Algorithm:
-    // 1. Become the master
-    // 2. If desired, restart from a manual checkpoint
-    // 3. Run all supersteps until complete
-    try {
-      long startMillis = System.currentTimeMillis();
-      long endMillis = 0;
-      bspServiceMaster.setup();
-      if (bspServiceMaster.becomeMaster()) {
-        // Attempt to create InputSplits if necessary. Bail out if that fails.
-        if (bspServiceMaster.getRestartedSuperstep() !=
-            BspService.UNSET_SUPERSTEP ||
-            (bspServiceMaster.createVertexInputSplits() != -1 &&
-                bspServiceMaster.createEdgeInputSplits() != -1)) {
-          long setupMillis = System.currentTimeMillis() - startMillis;
-          GiraphTimers.getInstance().getSetupMs().increment(setupMillis);
-          setupSecs = setupMillis / 1000.0d;
-          SuperstepState superstepState = SuperstepState.INITIAL;
-          long cachedSuperstep = BspService.UNSET_SUPERSTEP;
-          while (superstepState != SuperstepState.ALL_SUPERSTEPS_DONE) {
-            long startSuperstepMillis = System.currentTimeMillis();
-            cachedSuperstep = bspServiceMaster.getSuperstep();
-            GiraphMetrics.get().resetSuperstepMetrics(cachedSuperstep);
-            superstepState = bspServiceMaster.coordinateSuperstep();
-            long superstepMillis = System.currentTimeMillis() -
-                startSuperstepMillis;
-            superstepSecsMap.put(Long.valueOf(cachedSuperstep),
-                superstepMillis / 1000.0d);
-            if (LOG.isInfoEnabled()) {
-              LOG.info("masterThread: Coordination of superstep " +
-                  cachedSuperstep + " took " +
-                  superstepMillis / 1000.0d +
-                  " seconds ended with state " + superstepState +
-                  " and is now on superstep " +
-                  bspServiceMaster.getSuperstep());
-            }
-            if (superstepCounterOn) {
-              GiraphTimers.getInstance().getSuperstepMs(cachedSuperstep).
-                increment(superstepMillis);
-            }
-
-            bspServiceMaster.postSuperstep();
-
-            // If a worker failed, restart from a known good superstep
-            if (superstepState == SuperstepState.WORKER_FAILURE) {
-              bspServiceMaster.restartFromCheckpoint(
-                  bspServiceMaster.getLastGoodCheckpoint());
-            }
-            endMillis = System.currentTimeMillis();
-          }
-          bspServiceMaster.setJobState(ApplicationState.FINISHED, -1, -1);
-        }
-      }
-      bspServiceMaster.cleanup();
-      if (!superstepSecsMap.isEmpty()) {
-        GiraphTimers.getInstance().getShutdownMs().
-          increment(System.currentTimeMillis() - endMillis);
-        if (LOG.isInfoEnabled()) {
-          LOG.info("setup: Took " + setupSecs + " seconds.");
-        }
-        for (Entry<Long, Double> entry : superstepSecsMap.entrySet()) {
-          if (LOG.isInfoEnabled()) {
-            if (entry.getKey().longValue() ==
-                BspService.INPUT_SUPERSTEP) {
-              LOG.info("input superstep: Took " +
-                  entry.getValue() + " seconds.");
-            } else {
-              LOG.info("superstep " + entry.getKey() + ": Took " +
-                  entry.getValue() + " seconds.");
-            }
-          }
-          context.progress();
-        }
-        if (LOG.isInfoEnabled()) {
-          LOG.info("shutdown: Took " +
-              (System.currentTimeMillis() - endMillis) /
-              1000.0d + " seconds.");
-          LOG.info("total: Took " +
-              ((System.currentTimeMillis() - startMillis) /
-              1000.0d) + " seconds.");
-        }
-        GiraphTimers.getInstance().getTotalMs().
-          increment(System.currentTimeMillis() - startMillis);
-      }
-      bspServiceMaster.postApplication();
-      // CHECKSTYLE: stop IllegalCatchCheck
-    } catch (Exception e) {
-      // CHECKSTYLE: resume IllegalCatchCheck
-      bspServiceMaster.failureCleanup(e);
-      LOG.error("masterThread: Master algorithm failed with " +
-          e.getClass().getSimpleName(), e);
-      throw new IllegalStateException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/MultiGraphEdgeListVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/MultiGraphEdgeListVertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/MultiGraphEdgeListVertex.java
deleted file mode 100644
index 19d6e0c..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/MultiGraphEdgeListVertex.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.util.Iterator;
-
-/**
- * An edge-list backed vertex that allows for parallel edges.
- * This can be used not only to support mutable multigraphs,
- * but also to make mutations and edge-based input efficient without
- * resorting to a hash-map backed vertex.
- *
- * Note: removeEdge() here removes all edges pointing to the target vertex,
- * but returns only one of them (or null if there are no such edges).
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public abstract class MultiGraphEdgeListVertex<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends EdgeListVertexBase<I, V, E, M> {
-  @Override
-  public final boolean addEdge(Edge<I, E> edge) {
-    appendEdge(edge);
-    return true;
-  }
-
-  @Override
-  public int removeEdges(I targetVertexId) {
-    int removedCount = 0;
-    for (Iterator<Edge<I, E>> edges = getEdges().iterator(); edges.hasNext();) {
-      Edge<I, E> edge = edges.next();
-      if (edge.getTargetVertexId().equals(targetVertexId)) {
-        ++removedCount;
-        edges.remove();
-      }
-    }
-    return removedCount;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/MultiGraphRepresentativeVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/MultiGraphRepresentativeVertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/MultiGraphRepresentativeVertex.java
deleted file mode 100644
index 40e929c..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/MultiGraphRepresentativeVertex.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Similar to {@link RepresentativeVertex}, but allows for parallel edges.
- *
- * Note:  removeEdge() here removes all edges pointing to the target vertex,
- * but returns only one of them (or null if there are no such edges).
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public abstract class MultiGraphRepresentativeVertex<I extends
-    WritableComparable, V extends Writable, E extends Writable,
-    M extends Writable> extends RepresentativeVertexBase<I, V, E, M> {
-  @Override
-  public final boolean addEdge(Edge<I, E> edge) {
-    appendEdge(edge);
-    return true;
-  }
-
-  @Override
-  public final int removeEdges(I targetVertexId) {
-    return removeAllEdges(targetVertexId);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/MutableVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/MutableVertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/MutableVertex.java
deleted file mode 100644
index 04c17ed..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/MutableVertex.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.IOException;
-import java.util.Collections;
-
-/**
- * Interface used by VertexReader to set the properties of a new vertex
- * or mutate the graph.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public abstract class MutableVertex<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends Vertex<I, V, E, M> {
-  /**
-   * Add an edge for this vertex (happens immediately)
-   *
-   * @param edge Edge to add
-   * @return Return true if succeeded, false otherwise
-   */
-  public abstract boolean addEdge(Edge<I, E> edge);
-
-  /**
-   * Removes all edges pointing to the given vertex id.
-   *
-   * @param targetVertexId the target vertex id
-   * @return the number of removed edges
-   */
-  public abstract int removeEdges(I targetVertexId);
-
-  /**
-   * Sends a request to create a vertex that will be available during the
-   * next superstep.
-   *
-   * @param id Vertex id
-   * @param value Vertex value
-   * @param edges Initial edges
-   */
-  public void addVertexRequest(I id, V value, Iterable<Edge<I, E>> edges)
-    throws IOException {
-    Vertex<I, V, E, M> vertex = getConf().createVertex();
-    vertex.initialize(id, value, edges);
-    getGraphState().getWorkerClientRequestProcessor().addVertexRequest(vertex);
-  }
-
-  /**
-   * Sends a request to create a vertex that will be available during the
-   * next superstep.
-   *
-   * @param id Vertex id
-   * @param value Vertex value
-   */
-  public void addVertexRequest(I id, V value) throws IOException {
-    addVertexRequest(id, value, Collections.<Edge<I, E>>emptyList());
-  }
-
-  /**
-   * Request to remove a vertex from the graph
-   * (applied just prior to the next superstep).
-   *
-   * @param vertexId Id of the vertex to be removed.
-   */
-  public void removeVertexRequest(I vertexId) throws IOException {
-    getGraphState().getWorkerClientRequestProcessor().
-        removeVertexRequest(vertexId);
-  }
-
-  /**
-   * Request to add an edge of a vertex in the graph
-   * (processed just prior to the next superstep)
-   *
-   * @param sourceVertexId Source vertex id of edge
-   * @param edge Edge to add
-   */
-  public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge)
-    throws IOException {
-    getGraphState().getWorkerClientRequestProcessor().
-        addEdgeRequest(sourceVertexId, edge);
-  }
-
-  /**
-   * Request to remove all edges from a given source vertex to a given target
-   * vertex (processed just prior to the next superstep).
-   *
-   * @param sourceVertexId Source vertex id
-   * @param targetVertexId Target vertex id
-   */
-  public void removeEdgesRequest(I sourceVertexId, I targetVertexId)
-    throws IOException {
-    getGraphState().getWorkerClientRequestProcessor().
-        removeEdgesRequest(sourceVertexId, targetVertexId);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/RepresentativeVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/RepresentativeVertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/RepresentativeVertex.java
deleted file mode 100644
index 673b402..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/RepresentativeVertex.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-/**
- * This vertex should only be used in conjunction with ByteArrayPartition since
- * it has special code to deserialize by reusing objects and not instantiating
- * new ones.  If used without ByteArrayPartition, it will cause a lot of
- * wasted memory.
- *
- * Also, this vertex is optimized for space and not efficient for either adding
- * or random access of edges.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public abstract class RepresentativeVertex<
-    I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends RepresentativeVertexBase<I, V, E, M> {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(RepresentativeVertex.class);
-
-  @Override
-  public final boolean addEdge(Edge<I, E> edge) {
-    // Note that this is very expensive (deserializes all edges
-    // in an addEdge() request).
-    // Hopefully the user set all the edges in setEdges().
-    for (Edge<I, E> currentEdge : getEdges()) {
-      if (currentEdge.getTargetVertexId().equals(edge.getTargetVertexId())) {
-        LOG.warn("addEdge: Vertex=" + getId() +
-            ": already added an edge value for target vertex id " +
-            edge.getTargetVertexId());
-        return false;
-      }
-    }
-    appendEdge(edge);
-    return true;
-  }
-
-  @Override
-  public final int removeEdges(I targetVertexId) {
-    return removeFirstEdge(targetVertexId) ? 1 : 0;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/RepresentativeVertexBase.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/RepresentativeVertexBase.java b/giraph-core/src/main/java/org/apache/giraph/graph/RepresentativeVertexBase.java
deleted file mode 100644
index d0e4bfb..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/RepresentativeVertexBase.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.utils.ExtendedDataInput;
-import org.apache.giraph.utils.ExtendedDataOutput;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Common base class for representative vertices.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public abstract class RepresentativeVertexBase<
-    I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends MutableVertex<I, V, E, M> implements Iterable<Edge<I, E>> {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(RepresentativeVertex.class);
-  /** Representative edge */
-  private final Edge<I, E> representativeEdge = new Edge<I, E>();
-  /** Serialized edges */
-  private byte[] serializedEdges;
-  /** Byte used in serializedEdges */
-  private int serializedEdgesBytesUsed;
-  /** Number of edges */
-  private int edgeCount;
-
-  /**
-   * Append an edge to the serialized representation.
-   *
-   * @param edge Edge to append
-   */
-  protected void appendEdge(Edge<I, E> edge) {
-    ExtendedDataOutput extendedDataOutput =
-        getConf().createExtendedDataOutput(
-            serializedEdges, serializedEdgesBytesUsed);
-    try {
-      edge.getTargetVertexId().write(extendedDataOutput);
-      edge.getValue().write(extendedDataOutput);
-    } catch (IOException e) {
-      throw new IllegalStateException("addEdge: Failed to write to the " +
-          "new byte array");
-    }
-    serializedEdges = extendedDataOutput.getByteArray();
-    serializedEdgesBytesUsed = extendedDataOutput.getPos();
-    ++edgeCount;
-  }
-
-  /**
-   * Remove the first edge pointing to a target vertex.
-   *
-   * @param targetVertexId Target vertex id
-   * @return True if one such edge was found and removed.
-   */
-  protected boolean removeFirstEdge(I targetVertexId) {
-    // Note that this is very expensive (deserializes all edges
-    // in an removedge() request).
-    // Hopefully the user set all the edges correctly in setEdges().
-    RepresentativeEdgeIterator iterator = new RepresentativeEdgeIterator();
-    int foundStartOffset = 0;
-    while (iterator.hasNext()) {
-      Edge<I, E> edge = iterator.next();
-      if (edge.getTargetVertexId().equals(targetVertexId)) {
-        System.arraycopy(serializedEdges, iterator.extendedDataInput.getPos(),
-            serializedEdges, foundStartOffset,
-            serializedEdgesBytesUsed - iterator.extendedDataInput.getPos());
-        serializedEdgesBytesUsed -=
-            iterator.extendedDataInput.getPos() - foundStartOffset;
-        --edgeCount;
-        return true;
-      }
-      foundStartOffset = iterator.extendedDataInput.getPos();
-    }
-
-    return false;
-  }
-
-  /**
-   * Remove all edges pointing to a target vertex.
-   *
-   * @param targetVertexId Target vertex id
-   * @return The number of removed edges
-   */
-  protected int removeAllEdges(I targetVertexId) {
-    // Note that this is very expensive (deserializes all edges
-    // in an removedge() request).
-    // Hopefully the user set all the edges correctly in setEdges().
-    RepresentativeEdgeIterator iterator = new RepresentativeEdgeIterator();
-    int removedCount = 0;
-    List<Integer> foundStartOffsets = new LinkedList<Integer>();
-    List<Integer> foundEndOffsets = new LinkedList<Integer>();
-    int lastStartOffset = 0;
-    while (iterator.hasNext()) {
-      Edge<I, E> edge = iterator.next();
-      if (edge.getTargetVertexId().equals(targetVertexId)) {
-        foundStartOffsets.add(lastStartOffset);
-        foundEndOffsets.add(iterator.extendedDataInput.getPos());
-        ++removedCount;
-      }
-      lastStartOffset = iterator.extendedDataInput.getPos();
-    }
-    foundStartOffsets.add(serializedEdgesBytesUsed);
-
-    Iterator<Integer> foundStartOffsetIter = foundStartOffsets.iterator();
-    Integer foundStartOffset = foundStartOffsetIter.next();
-    for (Integer foundEndOffset : foundEndOffsets) {
-      Integer nextFoundStartOffset = foundStartOffsetIter.next();
-      System.arraycopy(serializedEdges, foundEndOffset,
-          serializedEdges, foundStartOffset,
-          nextFoundStartOffset - foundEndOffset);
-      serializedEdgesBytesUsed -= foundEndOffset - foundStartOffset;
-      foundStartOffset = nextFoundStartOffset;
-    }
-
-    edgeCount -= removedCount;
-    return removedCount;
-  }
-
-  @Override
-  public final void initialize(I id, V value, Iterable<Edge<I, E>> edges) {
-    // Make sure the initial values exist
-    representativeEdge.setTargetVertexId(getConf().createVertexId());
-    representativeEdge.setValue(getConf().createEdgeValue());
-    super.initialize(id, value, edges);
-  }
-
-  @Override
-  public final void initialize(I id, V value) {
-    // Make sure the initial values exist
-    representativeEdge.setTargetVertexId(getConf().createVertexId());
-    representativeEdge.setValue(getConf().createEdgeValue());
-    super.initialize(id, value);
-  }
-
-  /**
-   * Iterator that uses the representative edge (only one iterator allowed
-   * at a time)
-   */
-  private final class RepresentativeEdgeIterator implements
-      Iterator<Edge<I, E>> {
-    /** Input for processing the bytes */
-    private final ExtendedDataInput extendedDataInput;
-
-    /** Constructor. */
-    RepresentativeEdgeIterator() {
-      extendedDataInput = getConf().createExtendedDataInput(
-          serializedEdges, 0, serializedEdgesBytesUsed);
-    }
-    @Override
-    public boolean hasNext() {
-      return serializedEdges != null && extendedDataInput.available() > 0;
-    }
-
-    @Override
-    public Edge<I, E> next() {
-      try {
-        representativeEdge.getTargetVertexId().readFields(extendedDataInput);
-        representativeEdge.getValue().readFields(extendedDataInput);
-      } catch (IOException e) {
-        throw new IllegalStateException("next: Failed on pos " +
-            extendedDataInput.getPos() + " edge " + representativeEdge);
-      }
-      return representativeEdge;
-    }
-
-    @Override
-    public void remove() {
-      throw new IllegalAccessError("remove: Not supported");
-    }
-  }
-
-  @Override
-  public final Iterator<Edge<I, E>> iterator() {
-    return new RepresentativeEdgeIterator();
-  }
-
-  @Override
-  public final void setEdges(Iterable<Edge<I, E>> edges) {
-    ExtendedDataOutput extendedOutputStream =
-        getConf().createExtendedDataOutput();
-    if (edges != null) {
-      for (Edge<I, E> edge : edges) {
-        try {
-          edge.getTargetVertexId().write(extendedOutputStream);
-          edge.getValue().write(extendedOutputStream);
-        } catch (IOException e) {
-          throw new IllegalStateException("setEdges: Failed to serialize " +
-              edge);
-        }
-        ++edgeCount;
-      }
-    }
-    serializedEdges = extendedOutputStream.getByteArray();
-    serializedEdgesBytesUsed = extendedOutputStream.getPos();
-  }
-
-  @Override
-  public final Iterable<Edge<I, E>> getEdges() {
-    return this;
-  }
-
-  @Override
-  public final int getNumEdges() {
-    return edgeCount;
-  }
-
-  @Override
-  public final void readFields(DataInput in) throws IOException {
-    // Ensure these objects are present
-    if (representativeEdge.getTargetVertexId() == null) {
-      representativeEdge.setTargetVertexId(getConf().createVertexId());
-    }
-
-    if (representativeEdge.getValue() == null) {
-      representativeEdge.setValue(getConf().createEdgeValue());
-    }
-
-    I vertexId = getId();
-    if (vertexId == null) {
-      vertexId = getConf().createVertexId();
-    }
-    vertexId.readFields(in);
-
-    V vertexValue = getValue();
-    if (vertexValue == null) {
-      vertexValue = getConf().createVertexValue();
-    }
-    vertexValue.readFields(in);
-
-    initialize(vertexId, vertexValue);
-
-    serializedEdgesBytesUsed = in.readInt();
-    // Only create a new buffer if the old one isn't big enough
-    if (serializedEdges == null ||
-        serializedEdgesBytesUsed > serializedEdges.length) {
-      serializedEdges = new byte[serializedEdgesBytesUsed];
-    }
-    in.readFully(serializedEdges, 0, serializedEdgesBytesUsed);
-    edgeCount = in.readInt();
-
-    readHaltBoolean(in);
-  }
-
-  @Override
-  public final void write(DataOutput out) throws IOException {
-    getId().write(out);
-    getValue().write(out);
-
-    out.writeInt(serializedEdgesBytesUsed);
-    out.write(serializedEdges, 0, serializedEdgesBytesUsed);
-    out.writeInt(edgeCount);
-
-    out.writeBoolean(isHalted());
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java
deleted file mode 100644
index 4843dd5..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import com.google.common.collect.Lists;
-import org.apache.giraph.utils.EdgeIterables;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Mutable vertex with no edge values.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <M> Message data
- */
-public abstract class SimpleMutableVertex<I extends WritableComparable,
-    V extends Writable, M extends Writable> extends MutableVertex<I, V,
-    NullWritable, M> {
-  /**
-   * Set the neighbors of this vertex.
-   *
-   * @param neighbors Iterable of destination vertex ids.
-   */
-  public abstract void setNeighbors(Iterable<I> neighbors);
-
-  @Override
-  public void setEdges(Iterable<Edge<I, NullWritable>> edges) {
-    setNeighbors(EdgeIterables.getNeighbors(edges));
-  }
-
-  /**
-   * Get a read-only view of the neighbors of this
-   * vertex, i.e. the target vertices of its out-edges.
-   *
-   * @return the neighbors (sort order determined by subclass implementation).
-   */
-  public abstract Iterable<I> getNeighbors();
-
-  @Override
-  public Iterable<Edge<I, NullWritable>> getEdges() {
-    return EdgeIterables.getEdges(getNeighbors());
-  }
-
-  @Override
-  public NullWritable getEdgeValue(I targetVertexId) {
-    return NullWritable.get();
-  }
-
-  /**
-   * Add an edge for this vertex (happens immediately)
-   *
-   * @param targetVertexId target vertex
-   * @return Return true if succeeded, false otherwise
-   */
-  public abstract boolean addEdge(I targetVertexId);
-
-  @Override
-  public boolean addEdge(Edge<I, NullWritable> edge) {
-    return addEdge(edge.getTargetVertexId());
-  }
-
-  /**
-   * Request to add an edge of a vertex in the graph
-   * (processed just prior to the next superstep)
-   *
-   * @param sourceVertexId Source vertex id of edge
-   */
-  public void addEdgeRequest(I sourceVertexId) throws IOException {
-    getGraphState().getWorkerClientRequestProcessor().
-        addEdgeRequest(sourceVertexId, new Edge<I,
-            NullWritable>(sourceVertexId, NullWritable.get()));
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    I vertexId = (I) getConf().createVertexId();
-    vertexId.readFields(in);
-    V vertexValue = (V) getConf().createVertexValue();
-    vertexValue.readFields(in);
-
-    int numEdges = in.readInt();
-    List<Edge<I, NullWritable>> edges =
-        Lists.newArrayListWithCapacity(numEdges);
-    for (int i = 0; i < numEdges; ++i) {
-      I targetVertexId = (I) getConf().createVertexId();
-      targetVertexId.readFields(in);
-      edges.add(new Edge<I, NullWritable>(targetVertexId, NullWritable.get()));
-    }
-
-    initialize(vertexId, vertexValue, edges);
-
-    readHaltBoolean(in);
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    getId().write(out);
-    getValue().write(out);
-
-    out.writeInt(getNumEdges());
-    for (I neighbor : getNeighbors()) {
-      neighbor.write(out);
-    }
-
-    out.writeBoolean(isHalted());
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/SimpleVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/SimpleVertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/SimpleVertex.java
deleted file mode 100644
index 0d56d95..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/SimpleVertex.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import com.google.common.collect.Lists;
-import org.apache.giraph.utils.EdgeIterables;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Vertex with no edge values.
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <M> Message data
- */
-public abstract class SimpleVertex<I extends WritableComparable,
-    V extends Writable, M extends Writable> extends Vertex<I, V,
-    NullWritable, M> {
-  /**
-   * Set the neighbors of this vertex.
-   *
-   * @param neighbors Iterable of destination vertex ids.
-   */
-  public abstract void setNeighbors(Iterable<I> neighbors);
-
-  @Override
-  public void setEdges(Iterable<Edge<I, NullWritable>> edges) {
-    setNeighbors(EdgeIterables.getNeighbors(edges));
-  }
-
-  /**
-   * Get a read-only view of the neighbors of this
-   * vertex, i.e. the target vertices of its out-edges.
-   *
-   * @return the neighbors (sort order determined by subclass implementation).
-   */
-  public abstract Iterable<I> getNeighbors();
-
-  @Override
-  public Iterable<Edge<I, NullWritable>> getEdges() {
-    return EdgeIterables.getEdges(getNeighbors());
-  }
-
-  @Override
-  public NullWritable getEdgeValue(I targetVertexId) {
-    return NullWritable.get();
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    I vertexId = getConf().createVertexId();
-    vertexId.readFields(in);
-    V vertexValue = getConf().createVertexValue();
-    vertexValue.readFields(in);
-
-    int numEdges = in.readInt();
-    List<I> neighbors = Lists.newArrayListWithCapacity(numEdges);
-    for (int i = 0; i < numEdges; ++i) {
-      I targetVertexId = getConf().createVertexId();
-      targetVertexId.readFields(in);
-      neighbors.add(targetVertexId);
-    }
-
-    initialize(vertexId, vertexValue);
-    setNeighbors(neighbors);
-
-    readHaltBoolean(in);
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    getId().write(out);
-    getValue().write(out);
-
-    out.writeInt(getNumEdges());
-    for (I neighbor : getNeighbors()) {
-      neighbor.write(out);
-    }
-
-    out.writeBoolean(isHalted());
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java b/giraph-core/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java
deleted file mode 100644
index abdba44..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import java.io.IOException;
-import java.util.Map.Entry;
-
-import com.google.common.base.Charsets;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-
-/**
- * Default implementation of {@link AggregatorWriter}. Each line consists of
- * text and contains the aggregator name, the aggregator value and the
- * aggregator class.
- */
-public class TextAggregatorWriter implements AggregatorWriter {
-  /** The filename of the outputfile */
-  public static final String FILENAME =
-      "giraph.textAggregatorWriter.filename";
-  /** Signal for "never write" frequency */
-  public static final int NEVER = 0;
-  /** Signal for "write only the final values" frequency */
-  public static final int AT_THE_END = -1;
-  /** Signal for "write values in every superstep" frequency */
-  public static final int ALWAYS = -1;
-  /** The frequency of writing:
-   *  - NEVER: never write, files aren't created at all
-   *  - AT_THE_END: aggregators are written only when the computation is over
-   *  - int: i.e. 1 is every superstep, 2 every two supersteps and so on
-   */
-  public static final String FREQUENCY =
-      "giraph.textAggregatorWriter.frequency";
-  /** Default filename for dumping aggregator values */
-  private static final String DEFAULT_FILENAME = "aggregatorValues";
-  /** Handle to the outputfile */
-  protected FSDataOutputStream output;
-  /** Write every "frequency" supersteps */
-  private int frequency;
-
-  @Override
-  @SuppressWarnings("rawtypes")
-  public void initialize(Context context, long attempt) throws IOException {
-    Configuration conf = context.getConfiguration();
-    frequency = conf.getInt(FREQUENCY, NEVER);
-    String filename  = conf.get(FILENAME, DEFAULT_FILENAME);
-    if (frequency != NEVER) {
-      Path p = new Path(filename + "_" + attempt);
-      FileSystem fs = FileSystem.get(conf);
-      if (fs.exists(p)) {
-        throw new RuntimeException("aggregatorWriter file already" +
-            " exists: " + p.getName());
-      }
-      output = fs.create(p);
-    }
-  }
-
-  @Override
-  public void writeAggregator(
-      Iterable<Entry<String, Writable>> aggregatorMap,
-      long superstep) throws IOException {
-    if (shouldWrite(superstep)) {
-      for (Entry<String, Writable> entry : aggregatorMap) {
-        byte[] bytes = aggregatorToString(entry.getKey(), entry.getValue(),
-            superstep).getBytes(Charsets.UTF_8);
-        output.write(bytes, 0, bytes.length);
-      }
-      output.flush();
-    }
-  }
-
-  /**
-   * Implements the way an aggregator is converted into a String.
-   * Override this if you want to implement your own text format.
-   *
-   * @param aggregatorName Name of the aggregator
-   * @param value Value of aggregator
-   * @param superstep Current superstep
-   * @return The String representation for the aggregator
-   */
-  protected String aggregatorToString(String aggregatorName,
-      Writable value,
-      long superstep) {
-    return new StringBuilder("superstep=").append(superstep).append("\t")
-        .append(aggregatorName).append("=").append(value).append("\n")
-        .toString();
-  }
-
-  /**
-   * Should write this superstep?
-   *
-   * @param superstep Superstep to check
-   * @return True if should write, false otherwise
-   */
-  private boolean shouldWrite(long superstep) {
-    return (frequency == AT_THE_END && superstep == LAST_SUPERSTEP) ||
-        (frequency != NEVER && superstep % frequency == 0);
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (output != null) {
-      output.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
deleted file mode 100644
index 6db4735..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.partition.PartitionOwner;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Basic interface for writing a BSP application for computation.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public abstract class Vertex<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    implements WorkerAggregatorUsage, Writable,
-    ImmutableClassesGiraphConfigurable<I, V, E, M> {
-  /** Vertex id. */
-  private I id;
-  /** Vertex value. */
-  private V value;
-  /** If true, do not do anymore computation on this vertex. */
-  private boolean halt;
-  /** Global graph state **/
-  private GraphState<I, V, E, M> graphState;
-  /** Configuration */
-  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
-
-  /**
-   * This method must be called after instantiation of a vertex
-   * with ImmutableClassesGiraphConfiguration
-   * unless deserialization from readFields() is
-   * called.
-   *
-   * @param id Will be the vertex id
-   * @param value Will be the vertex value
-   * @param edges Iterable of edges
-   */
-  public void initialize(I id, V value, Iterable<Edge<I, E>> edges) {
-    this.id = id;
-    this.value = value;
-    setEdges(edges);
-  }
-
-  /**
-   * This method only sets id and value. Can be used by Vertex
-   * implementations in readFields().
-   *
-   * @param id Vertex id
-   * @param value Vertex value
-   */
-  public void initialize(I id, V value) {
-    this.id = id;
-    this.value = value;
-    setEdges(Collections.<Edge<I, E>>emptyList());
-  }
-
-  /**
-   * Set the outgoing edges for this vertex.
-   *
-   * @param edges Iterable of edges
-   */
-  public abstract void setEdges(Iterable<Edge<I, E>> edges);
-
-  /**
-   * Must be defined by user to do computation on a single Vertex.
-   *
-   * @param messages Messages that were sent to this vertex in the previous
-   *                 superstep.  Each message is only guaranteed to have
-   *                 a life expectancy as long as next() is not called.
-   * @throws IOException
-   */
-  public abstract void compute(Iterable<M> messages) throws IOException;
-
-  /**
-   * Retrieves the current superstep.
-   *
-   * @return Current superstep
-   */
-  public long getSuperstep() {
-    return getGraphState().getSuperstep();
-  }
-
-  /**
-   * Get the vertex id.
-   *
-   * @return My vertex id.
-   */
-  public I getId() {
-    return id;
-  }
-
-  /**
-   * Get the vertex value (data stored with vertex)
-   *
-   * @return Vertex value
-   */
-  public V getValue() {
-    return value;
-  }
-
-  /**
-   * Set the vertex data (immediately visible in the computation)
-   *
-   * @param value Vertex data to be set
-   */
-  public void setValue(V value) {
-    this.value = value;
-  }
-
-  /**
-   * Get the total (all workers) number of vertices that
-   * existed in the previous superstep.
-   *
-   * @return Total number of vertices (-1 if first superstep)
-   */
-  public long getTotalNumVertices() {
-    return getGraphState().getTotalNumVertices();
-  }
-
-  /**
-   * Get the total (all workers) number of edges that
-   * existed in the previous superstep.
-   *
-   * @return Total number of edges (-1 if first superstep)
-   */
-  public long getTotalNumEdges() {
-    return getGraphState().getTotalNumEdges();
-  }
-
-  /**
-   * Get a read-only view of the out-edges of this vertex.
-   *
-   * @return the out edges (sort order determined by subclass implementation).
-   */
-  public abstract Iterable<Edge<I, E>> getEdges();
-
-  /**
-   * Does an edge with the target vertex id exist?
-   *
-   * @param targetVertexId Target vertex id to check
-   * @return true if there is an edge to the target
-   */
-  public boolean hasEdge(I targetVertexId) {
-    for (Edge<I, E> edge : getEdges()) {
-      if (edge.getTargetVertexId().equals(targetVertexId)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /**
-   * Get the edge value associated with a target vertex id.
-   *
-   * @param targetVertexId Target vertex id to check
-   *
-   * @return the value of the edge to targetVertexId (or null if there
-   *         is no edge to it)
-   */
-  public E getEdgeValue(I targetVertexId) {
-    for (Edge<I, E> edge : getEdges()) {
-      if (edge.getTargetVertexId().equals(targetVertexId)) {
-        return edge.getValue();
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Get the number of outgoing edges on this vertex.
-   *
-   * @return the total number of outbound edges from this vertex
-   */
-  public int getNumEdges() {
-    return Iterables.size(getEdges());
-  }
-
-  /**
-   * Send a message to a vertex id.  The message should not be mutated after
-   * this method returns or else undefined results could occur.
-   *
-   * @param id Vertex id to send the message to
-   * @param message Message data to send.  Note that after the message is sent,
-   *        the user should not modify the object.
-   */
-  public void sendMessage(I id, M message) {
-    if (message == null) {
-      throw new IllegalArgumentException(
-          "sendMessage: Cannot send null message to " + id);
-    }
-    if (graphState.getWorkerClientRequestProcessor().
-          sendMessageRequest(id, message)) {
-      graphState.getGraphMapper().notifySentMessages();
-    }
-  }
-
-  /**
-   * Lookup WorkerInfo for myself.
-   *
-   * @return WorkerInfo about worker holding this Vertex.
-   */
-  public WorkerInfo getMyWorkerInfo() {
-    return getVertexWorkerInfo(id);
-  }
-
-  /**
-   * Lookup WorkerInfo for a Vertex.
-   *
-   * @param vertexId VertexId to lookup
-   * @return WorkerInfo about worker holding this Vertex.
-   */
-  public WorkerInfo getVertexWorkerInfo(I vertexId) {
-    return getVertexPartitionOwner(vertexId).getWorkerInfo();
-  }
-
-  /**
-   * Lookup PartitionOwner for a Vertex
-   *
-   * @param vertexId id of Vertex to look up.
-   * @return PartitionOwner holding Vertex
-   */
-  private PartitionOwner getVertexPartitionOwner(I vertexId) {
-    return getGraphState().getWorkerClientRequestProcessor().
-        getVertexPartitionOwner(vertexId);
-  }
-
-  /**
-   * Send a message to all edges.
-   *
-   * @param message Message sent to all edges.
-   */
-  public void sendMessageToAllEdges(M message) {
-    for (Edge<I, E> edge : getEdges()) {
-      sendMessage(edge.getTargetVertexId(), message);
-    }
-  }
-
-  /**
-   * After this is called, the compute() code will no longer be called for
-   * this vertex unless a message is sent to it.  Then the compute() code
-   * will be called once again until this function is called.  The
-   * application finishes only when all vertices vote to halt.
-   */
-  public void voteToHalt() {
-    halt = true;
-  }
-
-  /**
-   * Re-activate vertex if halted.
-   */
-  public void wakeUp() {
-    halt = false;
-  }
-
-  /**
-   * Is this vertex done?
-   *
-   * @return True if halted, false otherwise.
-   */
-  public boolean isHalted() {
-    return halt;
-  }
-
-  /**
-   * Get the graph state for all workers.
-   *
-   * @return Graph state for all workers
-   */
-  GraphState<I, V, E, M> getGraphState() {
-    return graphState;
-  }
-
-  /**
-   * Set the graph state for all workers
-   *
-   * @param graphState Graph state for all workers
-   */
-  void setGraphState(GraphState<I, V, E, M> graphState) {
-    this.graphState = graphState;
-  }
-
-  /**
-   * Get the mapper context
-   *
-   * @return Mapper context
-   */
-  public Mapper.Context getContext() {
-    return getGraphState().getContext();
-  }
-
-  /**
-   * Get the worker context
-   *
-   * @return WorkerContext context
-   */
-  public WorkerContext getWorkerContext() {
-    return getGraphState().getGraphMapper().getWorkerContext();
-  }
-
-  @Override
-  public <A extends Writable> void aggregate(String name, A value) {
-    getGraphState().getWorkerAggregatorUsage().
-        aggregate(name, value);
-  }
-
-  @Override
-  public <A extends Writable> A getAggregatedValue(String name) {
-    return getGraphState().getWorkerAggregatorUsage().
-        <A>getAggregatedValue(name);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    I vertexId = (I) getConf().createVertexId();
-    vertexId.readFields(in);
-    V vertexValue = (V) getConf().createVertexValue();
-    vertexValue.readFields(in);
-
-    int numEdges = in.readInt();
-    List<Edge<I, E>> edges = Lists.newArrayListWithCapacity(numEdges);
-    for (int i = 0; i < numEdges; ++i) {
-      I targetVertexId = (I) getConf().createVertexId();
-      targetVertexId.readFields(in);
-      E edgeValue = (E) getConf().createEdgeValue();
-      edgeValue.readFields(in);
-      edges.add(new Edge<I, E>(targetVertexId, edgeValue));
-    }
-
-    initialize(vertexId, vertexValue, edges);
-
-    readHaltBoolean(in);
-  }
-
-  /**
-   * Helper method for subclasses which implement their own readFields() to use.
-   *
-   * @param in DataInput to read from.
-   * @throws IOException If anything goes wrong during read.
-   */
-  protected void readHaltBoolean(DataInput in) throws IOException {
-    halt = in.readBoolean();
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    getId().write(out);
-    getValue().write(out);
-
-    out.writeInt(getNumEdges());
-    for (Edge<I, E> edge : getEdges()) {
-      edge.getTargetVertexId().write(out);
-      edge.getValue().write(out);
-    }
-
-    out.writeBoolean(halt);
-  }
-
-  @Override
-  public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
-    return conf;
-  }
-
-  @Override
-  public void setConf(ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
-    this.conf = conf;
-  }
-
-  @Override
-  public String toString() {
-    return "Vertex(id=" + getId() + ",value=" + getValue() +
-        ",#edges=" + getNumEdges() + ")";
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java
index 957cf82..b529202 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java
@@ -20,6 +20,7 @@ package org.apache.giraph.graph;
 
 import java.util.List;
 
+import org.apache.giraph.vertex.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputFormat.java
deleted file mode 100644
index 9824b69..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputFormat.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Use this to load data for a BSP application.  Note that the InputSplit must
- * also implement Writable.  The InputSplits will determine the partitioning of
- * vertices across the mappers, so keep that in consideration when implementing
- * getSplits().
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public abstract class VertexInputFormat<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    implements GiraphInputFormat {
-  /**
-   * Logically split the vertices for a graph processing application.
-   *
-   * Each {@link InputSplit} is then assigned to a worker for processing.
-   *
-   * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
-   * input files are not physically split into chunks. For e.g. a split could
-   * be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat
-   * also creates the {@link VertexReader} to read the {@link InputSplit}.
-   *
-   * Also, the number of workers is a hint given to the developer to try to
-   * intelligently determine how many splits to create (if this is
-   * adjustable) at runtime.
-   *
-   * @param context Context of the job
-   * @param numWorkers Number of workers used for this job
-   * @return an array of {@link InputSplit}s for the job.
-   */
-  @Override
-  public abstract List<InputSplit> getSplits(
-    JobContext context, int numWorkers)
-    throws IOException, InterruptedException;
-
-  /**
-   * Create a vertex reader for a given split. The framework will call
-   * {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
-   * the split is used.
-   *
-   * @param split the split to be read
-   * @param context the information about the task
-   * @return a new record reader
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public abstract VertexReader<I, V, E, M> createVertexReader(
-      InputSplit split,
-      TaskAttemptContext context) throws IOException;
-}