You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/01/04 21:52:39 UTC

[15/23] GIRAPH-409: Refactor / cleanups (nitay)

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java
deleted file mode 100644
index e1e6235..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.partition.PartitionOwner;
-import org.apache.giraph.metrics.GiraphMetrics;
-import org.apache.giraph.metrics.GiraphMetricsRegistry;
-import org.apache.giraph.utils.LoggerUtils;
-import org.apache.giraph.utils.MemoryUtils;
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-
-import com.yammer.metrics.core.Counter;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Load as many vertex input splits as possible.
- * Every thread will has its own instance of WorkerClientRequestProcessor
- * to send requests.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public class VertexInputSplitsCallable<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends InputSplitsCallable<I, V, E, M> {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(VertexInputSplitsCallable.class);
-  /** Total vertices loaded */
-  private long totalVerticesLoaded = 0;
-  /** Total edges loaded */
-  private long totalEdgesLoaded = 0;
-  /** Input split max vertices (-1 denotes all) */
-  private final long inputSplitMaxVertices;
-  /** Bsp service worker (only use thread-safe methods) */
-  private final BspServiceWorker<I, V, E, M> bspServiceWorker;
-
-  // Metrics
-  /** number of vertices loaded counter */
-  private final Counter verticesLoadedCounter;
-  /** number of edges loaded counter */
-  private final Counter edgesLoadedCounter;
-
-  /**
-   * Constructor.
-   *
-   * @param context Context
-   * @param graphState Graph state
-   * @param configuration Configuration
-   * @param bspServiceWorker service worker
-   * @param inputSplitPathList List of the paths of the input splits
-   * @param workerInfo This worker's info
-   * @param zooKeeperExt Handle to ZooKeeperExt
-   */
-  public VertexInputSplitsCallable(
-      Mapper<?, ?, ?, ?>.Context context,
-      GraphState<I, V, E, M> graphState,
-      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
-      BspServiceWorker<I, V, E, M> bspServiceWorker,
-      List<String> inputSplitPathList,
-      WorkerInfo workerInfo,
-      ZooKeeperExt zooKeeperExt)  {
-    super(context, graphState, configuration, bspServiceWorker,
-        inputSplitPathList, workerInfo, zooKeeperExt,
-        BspServiceWorker.VERTEX_INPUT_SPLIT_RESERVED_NODE,
-        BspServiceWorker.VERTEX_INPUT_SPLIT_FINISHED_NODE,
-        bspServiceWorker.vertexInputSplitsEvents);
-
-    inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
-    this.bspServiceWorker = bspServiceWorker;
-
-    // Initialize Metrics
-    GiraphMetricsRegistry jobMetrics = GiraphMetrics.get().perJob();
-    verticesLoadedCounter = jobMetrics.getCounter(COUNTER_VERTICES_LOADED);
-    edgesLoadedCounter = jobMetrics.getCounter(COUNTER_EDGES_LOADED);
-  }
-
-  /**
-   * Read vertices from input split.  If testing, the user may request a
-   * maximum number of vertices to be read from an input split.
-   *
-   * @param inputSplit Input split to process with vertex reader
-   * @param graphState Current graph state
-   * @return Vertices and edges loaded from this input split
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  @Override
-  protected VertexEdgeCount readInputSplit(
-      InputSplit inputSplit,
-      GraphState<I, V, E, M> graphState)
-    throws IOException, InterruptedException {
-    VertexInputFormat<I, V, E, M> vertexInputFormat =
-        configuration.createVertexInputFormat();
-    VertexReader<I, V, E, M> vertexReader =
-        vertexInputFormat.createVertexReader(inputSplit, context);
-    vertexReader.initialize(inputSplit, context);
-    long inputSplitVerticesLoaded = 0;
-    long inputSplitEdgesLoaded = 0;
-    while (vertexReader.nextVertex()) {
-      Vertex<I, V, E, M> readerVertex =
-          vertexReader.getCurrentVertex();
-      if (readerVertex.getId() == null) {
-        throw new IllegalArgumentException(
-            "readInputSplit: Vertex reader returned a vertex " +
-                "without an id!  - " + readerVertex);
-      }
-      if (readerVertex.getValue() == null) {
-        readerVertex.setValue(configuration.createVertexValue());
-      }
-      readerVertex.setConf(configuration);
-      readerVertex.setGraphState(graphState);
-
-      PartitionOwner partitionOwner =
-          bspServiceWorker.getVertexPartitionOwner(readerVertex.getId());
-      graphState.getWorkerClientRequestProcessor().sendVertexRequest(
-          partitionOwner, readerVertex);
-      context.progress(); // do this before potential data transfer
-      ++inputSplitVerticesLoaded;
-      inputSplitEdgesLoaded += readerVertex.getNumEdges();
-
-      // Update status every 250k vertices
-      if (((inputSplitVerticesLoaded + totalVerticesLoaded) % 250000) == 0) {
-        LoggerUtils.setStatusAndLog(context, LOG, Level.INFO,
-            "readInputSplit: Loaded " +
-                (inputSplitVerticesLoaded + totalVerticesLoaded) +
-                " vertices " +
-                (inputSplitEdgesLoaded + totalEdgesLoaded) + " edges " +
-                MemoryUtils.getRuntimeMemoryStats());
-      }
-
-      // For sampling, or to limit outlier input splits, the number of
-      // records per input split can be limited
-      if (inputSplitMaxVertices > 0 &&
-          inputSplitVerticesLoaded >= inputSplitMaxVertices) {
-        if (LOG.isInfoEnabled()) {
-          LOG.info("readInputSplit: Leaving the input " +
-              "split early, reached maximum vertices " +
-              inputSplitVerticesLoaded);
-        }
-        break;
-      }
-    }
-    vertexReader.close();
-    totalVerticesLoaded += inputSplitVerticesLoaded;
-    verticesLoadedCounter.inc(inputSplitVerticesLoaded);
-    totalEdgesLoaded += inputSplitEdgesLoaded;
-    edgesLoadedCounter.inc(inputSplitEdgesLoaded);
-    return new VertexEdgeCount(
-        inputSplitVerticesLoaded, inputSplitEdgesLoaded);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallableFactory.java
deleted file mode 100644
index a55c4b0..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallableFactory.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
-
-import java.util.List;
-
-/**
- * Factory for {@link VertexInputSplitsCallable}s.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public class VertexInputSplitsCallableFactory<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    implements InputSplitsCallableFactory<I, V, E, M> {
-  /** Mapper context. */
-  private final Mapper<?, ?, ?, ?>.Context context;
-  /** Graph state. */
-  private final GraphState<I, V, E, M> graphState;
-  /** Configuration. */
-  private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
-  /** {@link BspServiceWorker} we're running on. */
-  private final BspServiceWorker<I, V, E, M> bspServiceWorker;
-  /** List of input split paths. */
-  private final List<String> inputSplitPathList;
-  /** Worker info. */
-  private final WorkerInfo workerInfo;
-  /** {@link ZooKeeperExt} for this worker. */
-  private final ZooKeeperExt zooKeeperExt;
-
-  /**
-   * Constructor.
-   *
-   * @param context Mapper context
-   * @param graphState Graph state
-   * @param configuration Configuration
-   * @param bspServiceWorker Calling {@link BspServiceWorker}
-   * @param inputSplitPathList List of input split paths
-   * @param workerInfo Worker info
-   * @param zooKeeperExt {@link ZooKeeperExt} for this worker
-   */
-  public VertexInputSplitsCallableFactory(
-      Mapper<?, ?, ?, ?>.Context context,
-      GraphState<I, V, E, M> graphState,
-      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
-      BspServiceWorker<I, V, E, M> bspServiceWorker,
-      List<String> inputSplitPathList,
-      WorkerInfo workerInfo,
-      ZooKeeperExt zooKeeperExt) {
-    this.context = context;
-    this.graphState = graphState;
-    this.configuration = configuration;
-    this.bspServiceWorker = bspServiceWorker;
-    this.inputSplitPathList = inputSplitPathList;
-    this.workerInfo = workerInfo;
-    this.zooKeeperExt = zooKeeperExt;
-  }
-
-  @Override
-  public InputSplitsCallable<I, V, E, M> newCallable() {
-    return new VertexInputSplitsCallable<I, V, E, M>(
-        context,
-        graphState,
-        configuration,
-        bspServiceWorker,
-        inputSplitPathList,
-        workerInfo,
-        zooKeeperExt);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
index 4548cea..d9be165 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
@@ -26,6 +26,7 @@ import java.util.List;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.vertex.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.json.JSONException;

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/VertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexOutputFormat.java
deleted file mode 100644
index d4c8e1c..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexOutputFormat.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import java.io.IOException;
-
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-/**
- * Implement to output the graph after the computation.  It is modeled
- * directly after the Hadoop OutputFormat.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public abstract class VertexOutputFormat<
-    I extends WritableComparable, V extends Writable, E extends Writable> {
-  /**
-   * Create a vertex writer for a given split. The framework will call
-   * {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
-   * the split is used.
-   *
-   * @param context the information about the task
-   * @return a new vertex writer
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public abstract VertexWriter<I, V, E> createVertexWriter(
-    TaskAttemptContext context) throws IOException, InterruptedException;
-
-  /**
-   * Check for validity of the output-specification for the job.
-   * (Copied from Hadoop OutputFormat)
-   *
-   * <p>This is to validate the output specification for the job when it is
-   * a job is submitted.  Typically checks that it does not already exist,
-   * throwing an exception when it already exists, so that output is not
-   * overwritten.</p>
-   *
-   * @param context information about the job
-   * @throws IOException when output should not be attempted
-   */
-  public abstract void checkOutputSpecs(JobContext context)
-    throws IOException, InterruptedException;
-
-  /**
-   * Get the output committer for this output format. This is responsible
-   * for ensuring the output is committed correctly.
-   * (Copied from Hadoop OutputFormat)
-   *
-   * @param context the task context
-   * @return an output committer
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public abstract OutputCommitter getOutputCommitter(
-    TaskAttemptContext context) throws IOException, InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/VertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexReader.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexReader.java
deleted file mode 100644
index 77801cc..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexReader.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-
-/**
- * Analogous to {@link RecordReader} for vertices.  Will read the vertices
- * from an input split.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public interface VertexReader<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> {
-  /**
-   * Use the input split and context to setup reading the vertices.
-   * Guaranteed to be called prior to any other function.
-   *
-   * @param inputSplit Input split to be used for reading vertices.
-   * @param context Context from the task.
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  void initialize(InputSplit inputSplit, TaskAttemptContext context)
-    throws IOException, InterruptedException;
-
-  /**
-   *
-   * @return false iff there are no more vertices
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  boolean nextVertex() throws IOException, InterruptedException;
-
-  /**
-   * Get the current vertex.
-   *
-   * @return the current vertex which has been read.
-   *         nextVertex() should be called first.
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  Vertex<I, V, E, M> getCurrentVertex()
-    throws IOException, InterruptedException;
-
-  /**
-   * Close this {@link VertexReader} to future operations.
-   *
-   * @throws IOException
-   */
-  void close() throws IOException;
-
-  /**
-   * How much of the input has the {@link VertexReader} consumed i.e.
-   * has been processed by?
-   *
-   * @return Progress from <code>0.0</code> to <code>1.0</code>.
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  float getProgress() throws IOException, InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java
index 1616290..4a36706 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.graph;
 
+import org.apache.giraph.vertex.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -40,8 +41,7 @@ public interface VertexResolver<I extends WritableComparable,
    * excluding the normal case (a vertex already exists and has zero or more
    * messages sent it to).
    *
-   * @param vertexId Vertex id (can be used for {@link Vertex}'s
-   *        initialize())
+   * @param vertexId Vertex id (can be used for {@link Vertex}'s initialize())
    * @param vertex Original vertex or null if none
    * @param vertexChanges Changes that happened to this vertex or null if none
    * @param hasMessages True iff vertex received messages in the last superstep

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java
deleted file mode 100644
index 804d23e..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-
-/**
- * Vertex input format that only allows setting vertex id and value. It can
- * be used in conjunction with {@link EdgeInputFormat}.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message value
- */
-public abstract class VertexValueInputFormat<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends VertexInputFormat<I, V, E, M> {
-  /**
-   * Create a {@link VertexValueReader} for a given split. The framework will
-   * call {@link VertexValueReader#initialize(InputSplit,
-   * TaskAttemptContext)} before the split is used.
-   *
-   * @param split The split to be read
-   * @param context The information about the task
-   * @return A new vertex value reader
-   * @throws IOException
-   */
-  public abstract VertexValueReader<I, V, E, M> createVertexValueReader(
-      InputSplit split, TaskAttemptContext context) throws IOException;
-
-  @Override
-  public final VertexReader<I, V, E, M> createVertexReader(
-      InputSplit split, TaskAttemptContext context) throws IOException {
-    return createVertexValueReader(split, context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueReader.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueReader.java
deleted file mode 100644
index 569714d..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueReader.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-
-/**
- * Vertex reader for {@link VertexValueInputFormat}.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message value
- */
-public abstract class VertexValueReader<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends BasicVertexValueReader<I, V, E, M> {
-  /** Configuration. */
-  private ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
-
-  @Override
-  public void initialize(InputSplit inputSplit, TaskAttemptContext context)
-    throws IOException, InterruptedException {
-    configuration = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
-        context.getConfiguration());
-  }
-
-  @Override
-  public final Vertex<I, V, E, M> getCurrentVertex() throws IOException,
-      InterruptedException {
-    Vertex<I, V, E, M> vertex = getConf().createVertex();
-    vertex.initialize(getCurrentVertexId(), getCurrentVertexValue());
-    return vertex;
-  }
-
-  public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
-    return configuration;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/VertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexWriter.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexWriter.java
deleted file mode 100644
index 7c0e06e..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexWriter.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-/**
- * Implement to output a vertex range of the graph after the computation
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public interface VertexWriter<I extends WritableComparable, V extends Writable,
-    E extends Writable> {
-  /**
-   * Use the context to setup writing the vertices.
-   * Guaranteed to be called prior to any other function.
-   *
-   * @param context Context used to write the vertices.
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  void initialize(TaskAttemptContext context) throws IOException,
-    InterruptedException;
-
-  /**
-   * Writes the next vertex and associated data
-   *
-   * @param vertex set the properties of this vertex
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  void writeVertex(Vertex<I, V, E, ?> vertex)
-    throws IOException, InterruptedException;
-
-  /**
-   * Close this {@link VertexWriter} to future operations.
-   *
-   * @param context the context of the task
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  void close(TaskAttemptContext context)
-    throws IOException, InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java
deleted file mode 100644
index 8e35998..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
-import org.apache.giraph.comm.aggregators.AggregatedValueOutputStream;
-import org.apache.giraph.comm.aggregators.AggregatorUtils;
-import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
-import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.Progressable;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Maps;
-
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * Handler for aggregators on worker. Provides the aggregated values and
- * performs aggregations from user vertex code (thread-safe). Also has
- * methods for all superstep coordination related to aggregators.
- *
- * At the beginning of any superstep any worker calls prepareSuperstep(),
- * which blocks until the final aggregates from the previous superstep have
- * been delivered to the worker.
- * Next, during the superstep worker can call aggregate() and
- * getAggregatedValue() (both methods are thread safe) the former
- * computes partial aggregates for this superstep from the worker,
- * the latter returns (read-only) final aggregates from the previous superstep.
- * Finally, at the end of the superstep, the worker calls finishSuperstep(),
- * which propagates non-owned partial aggregates to the owner workers,
- * and sends the final aggregate from the owner worker to the master.
- */
-public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(WorkerAggregatorHandler.class);
-  /** Map of values from previous superstep */
-  private Map<String, Writable> previousAggregatedValueMap =
-      Maps.newHashMap();
-  /** Map of aggregators for current superstep */
-  private Map<String, Aggregator<Writable>> currentAggregatorMap =
-      Maps.newHashMap();
-  /** Service worker */
-  private final CentralizedServiceWorker<?, ?, ?, ?> serviceWorker;
-  /** Progressable for reporting progress */
-  private final Progressable progressable;
-  /** How big a single aggregator request can be */
-  private final int maxBytesPerAggregatorRequest;
-  /** Giraph configuration */
-  private final ImmutableClassesGiraphConfiguration conf;
-
-  /**
-   * Constructor
-   *
-   * @param serviceWorker Service worker
-   * @param conf          Giraph configuration
-   * @param progressable  Progressable for reporting progress
-   */
-  public WorkerAggregatorHandler(
-      CentralizedServiceWorker<?, ?, ?, ?> serviceWorker,
-      ImmutableClassesGiraphConfiguration conf,
-      Progressable progressable) {
-    this.serviceWorker = serviceWorker;
-    this.progressable = progressable;
-    this.conf = conf;
-    maxBytesPerAggregatorRequest = conf.getInt(
-        AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST,
-        AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST_DEFAULT);
-  }
-
-  @Override
-  public <A extends Writable> void aggregate(String name, A value) {
-    Aggregator<Writable> aggregator = currentAggregatorMap.get(name);
-    if (aggregator != null) {
-      synchronized (aggregator) {
-        aggregator.aggregate(value);
-      }
-    } else {
-      throw new IllegalStateException("aggregate: Tried to aggregate value " +
-          "to unregistered aggregator " + name);
-    }
-  }
-
-  @Override
-  public <A extends Writable> A getAggregatedValue(String name) {
-    return (A) previousAggregatedValueMap.get(name);
-  }
-
-  /**
-   * Prepare aggregators for current superstep
-   *
-   * @param requestProcessor Request processor for aggregators
-   */
-  public void prepareSuperstep(
-      WorkerAggregatorRequestProcessor requestProcessor) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("prepareSuperstep: Start preparing aggregators");
-    }
-    AllAggregatorServerData allAggregatorData =
-        serviceWorker.getServerData().getAllAggregatorData();
-    // Wait for my aggregators
-    Iterable<byte[]> dataToDistribute =
-        allAggregatorData.getDataFromMasterWhenReady();
-    try {
-      // Distribute my aggregators
-      requestProcessor.distributeAggregators(dataToDistribute);
-    } catch (IOException e) {
-      throw new IllegalStateException("prepareSuperstep: " +
-          "IOException occurred while trying to distribute aggregators", e);
-    }
-    // Wait for all other aggregators and store them
-    allAggregatorData.fillNextSuperstepMapsWhenReady(
-        serviceWorker.getWorkerInfoList().size(), previousAggregatedValueMap,
-        currentAggregatorMap);
-    allAggregatorData.reset();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("prepareSuperstep: Aggregators prepared");
-    }
-  }
-
-  /**
-   * Send aggregators to their owners and in the end to the master
-   *
-   * @param requestProcessor Request processor for aggregators
-   */
-  public void finishSuperstep(
-      WorkerAggregatorRequestProcessor requestProcessor) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("finishSuperstep: Start finishing aggregators");
-    }
-    OwnerAggregatorServerData ownerAggregatorData =
-        serviceWorker.getServerData().getOwnerAggregatorData();
-    // First send partial aggregated values to their owners and determine
-    // which aggregators belong to this worker
-    for (Map.Entry<String, Aggregator<Writable>> entry :
-        currentAggregatorMap.entrySet()) {
-      try {
-        boolean sent = requestProcessor.sendAggregatedValue(entry.getKey(),
-            entry.getValue().getAggregatedValue());
-        if (!sent) {
-          // If it's my aggregator, add it directly
-          ownerAggregatorData.aggregate(entry.getKey(),
-              entry.getValue().getAggregatedValue());
-        }
-      } catch (IOException e) {
-        throw new IllegalStateException("finishSuperstep: " +
-            "IOException occurred while sending aggregator " +
-            entry.getKey() + " to its owner", e);
-      }
-      progressable.progress();
-    }
-    try {
-      // Flush
-      requestProcessor.flush();
-    } catch (IOException e) {
-      throw new IllegalStateException("finishSuperstep: " +
-          "IOException occurred while sending aggregators to owners", e);
-    }
-
-    // Wait to receive partial aggregated values from all other workers
-    Iterable<Map.Entry<String, Writable>> myAggregators =
-        ownerAggregatorData.getMyAggregatorValuesWhenReady(
-            serviceWorker.getWorkerInfoList().size());
-
-    // Send final aggregated values to master
-    AggregatedValueOutputStream aggregatorOutput =
-        new AggregatedValueOutputStream();
-    for (Map.Entry<String, Writable> entry : myAggregators) {
-      try {
-        int currentSize = aggregatorOutput.addAggregator(entry.getKey(),
-            entry.getValue());
-        if (currentSize > maxBytesPerAggregatorRequest) {
-          requestProcessor.sendAggregatedValuesToMaster(
-              aggregatorOutput.flush());
-        }
-        progressable.progress();
-      } catch (IOException e) {
-        throw new IllegalStateException("finishSuperstep: " +
-            "IOException occurred while writing aggregator " +
-            entry.getKey(), e);
-      }
-    }
-    try {
-      requestProcessor.sendAggregatedValuesToMaster(aggregatorOutput.flush());
-    } catch (IOException e) {
-      throw new IllegalStateException("finishSuperstep: " +
-          "IOException occured while sending aggregators to master", e);
-    }
-    // Wait for master to receive aggregated values before proceeding
-    serviceWorker.getWorkerClient().waitAllRequests();
-
-    ownerAggregatorData.reset();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("finishSuperstep: Aggregators finished");
-    }
-  }
-
-  /**
-   * Create new aggregator usage which will be used by one of the compute
-   * threads.
-   *
-   * @return New aggregator usage
-   */
-  public WorkerThreadAggregatorUsage newThreadAggregatorUsage() {
-    if (AggregatorUtils.useThreadLocalAggregators(conf)) {
-      return new ThreadLocalWorkerAggregatorUsage();
-    } else {
-      return this;
-    }
-  }
-
-  @Override
-  public void finishThreadComputation() {
-    // If we don't use thread-local aggregators, all the aggregated values
-    // are already in this object
-  }
-
-  /**
-   * Not thread-safe implementation of {@link WorkerThreadAggregatorUsage}.
-   * We can use one instance of this object per thread to prevent
-   * synchronizing on each aggregate() call. In the end of superstep,
-   * values from each of these will be aggregated back to {@link
-   * WorkerAggregatorHandler}
-   */
-  public class ThreadLocalWorkerAggregatorUsage
-      implements WorkerThreadAggregatorUsage {
-    /** Thread-local aggregator map */
-    private final Map<String, Aggregator<Writable>> threadAggregatorMap;
-
-    /**
-     * Constructor
-     *
-     * Creates new instances of all aggregators from
-     * {@link WorkerAggregatorHandler}
-     */
-    public ThreadLocalWorkerAggregatorUsage() {
-      threadAggregatorMap = Maps.newHashMapWithExpectedSize(
-          WorkerAggregatorHandler.this.currentAggregatorMap.size());
-      for (Map.Entry<String, Aggregator<Writable>> entry :
-          WorkerAggregatorHandler.this.currentAggregatorMap.entrySet()) {
-        threadAggregatorMap.put(entry.getKey(),
-            AggregatorUtils.newAggregatorInstance(
-                (Class<Aggregator<Writable>>) entry.getValue().getClass()));
-      }
-    }
-
-    @Override
-    public <A extends Writable> void aggregate(String name, A value) {
-      Aggregator<Writable> aggregator = threadAggregatorMap.get(name);
-      if (aggregator != null) {
-        aggregator.aggregate(value);
-      } else {
-        throw new IllegalStateException("aggregate: " +
-            "Tried to aggregate value to unregistered aggregator " + name);
-      }
-    }
-
-    @Override
-    public <A extends Writable> A getAggregatedValue(String name) {
-      return WorkerAggregatorHandler.this.<A>getAggregatedValue(name);
-    }
-
-    @Override
-    public void finishThreadComputation() {
-      // Aggregate the values this thread's vertices provided back to
-      // WorkerAggregatorHandler
-      for (Map.Entry<String, Aggregator<Writable>> entry :
-          threadAggregatorMap.entrySet()) {
-        WorkerAggregatorHandler.this.aggregate(entry.getKey(),
-            entry.getValue().getAggregatedValue());
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/WorkerAggregatorUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/WorkerAggregatorUsage.java b/giraph-core/src/main/java/org/apache/giraph/graph/WorkerAggregatorUsage.java
deleted file mode 100644
index b7b98cd..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/WorkerAggregatorUsage.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-
-/**
- * Vertex classes can access and change aggregators through this interface
- */
-public interface WorkerAggregatorUsage {
-  /**
-   * Add a new value
-   *
-   * @param name Name of aggregator
-   * @param value Value to add
-   * @param <A> Aggregated value
-   */
-  <A extends Writable> void aggregate(String name, A value);
-
-  /**
-   * Get value of an aggregator.
-   *
-   * @param name Name of aggregator
-   * @param <A> Aggregated value
-   * @return Value of the aggregator
-   */
-  <A extends Writable> A getAggregatedValue(String name);
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/WorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/WorkerContext.java b/giraph-core/src/main/java/org/apache/giraph/graph/WorkerContext.java
deleted file mode 100644
index 5f5f6fa..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/WorkerContext.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Mapper;
-
-/**
- * WorkerContext allows for the execution of user code
- * on a per-worker basis. There's one WorkerContext per worker.
- */
-@SuppressWarnings("rawtypes")
-public abstract class WorkerContext implements WorkerAggregatorUsage {
-  /** Global graph state */
-  private GraphState graphState;
-
-  /**
-   * Set the graph state.
-   *
-   *  @param graphState Used to set the graph state.
-   */
-  public void setGraphState(GraphState graphState) {
-    this.graphState = graphState;
-  }
-
-  /**
-   * Initialize the WorkerContext.
-   * This method is executed once on each Worker before the first
-   * superstep starts.
-   *
-   * @throws IllegalAccessException Thrown for getting the class
-   * @throws InstantiationException Expected instantiation in this method.
-   */
-  public abstract void preApplication() throws InstantiationException,
-    IllegalAccessException;
-
-  /**
-   * Finalize the WorkerContext.
-   * This method is executed once on each Worker after the last
-   * superstep ends.
-   */
-  public abstract void postApplication();
-
-  /**
-   * Execute user code.
-   * This method is executed once on each Worker before each
-   * superstep starts.
-   */
-  public abstract void preSuperstep();
-
-  /**
-   * Execute user code.
-   * This method is executed once on each Worker after each
-   * superstep ends.
-   */
-  public abstract void postSuperstep();
-
-  /**
-   * Retrieves the current superstep.
-   *
-   * @return Current superstep
-   */
-  public long getSuperstep() {
-    return graphState.getSuperstep();
-  }
-
-  /**
-   * Get the total (all workers) number of vertices that
-   * existed in the previous superstep.
-   *
-   * @return Total number of vertices (-1 if first superstep)
-   */
-  public long getTotalNumVertices() {
-    return graphState.getTotalNumVertices();
-  }
-
-  /**
-   * Get the total (all workers) number of edges that
-   * existed in the previous superstep.
-   *
-   * @return Total number of edges (-1 if first superstep)
-   */
-  public long getTotalNumEdges() {
-    return graphState.getTotalNumEdges();
-  }
-
-  /**
-   * Get the mapper context
-   *
-   * @return Mapper context
-   */
-  public Mapper.Context getContext() {
-    return graphState.getContext();
-  }
-
-  @Override
-  public <A extends Writable> void aggregate(String name, A value) {
-    graphState.getWorkerAggregatorUsage().aggregate(name, value);
-  }
-
-  @Override
-  public <A extends Writable> A getAggregatedValue(String name) {
-    return graphState.getWorkerAggregatorUsage().<A>getAggregatedValue(name);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/WorkerInfo.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/WorkerInfo.java b/giraph-core/src/main/java/org/apache/giraph/graph/WorkerInfo.java
deleted file mode 100644
index c9571ba..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/WorkerInfo.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-/**
- * Information about a worker that is sent to the master and other workers.
- */
-public class WorkerInfo extends TaskInfo {
-  /**
-   * Constructor for reflection
-   */
-  public WorkerInfo() {
-  }
-
-  @Override
-  public String toString() {
-    return "Worker(" + super.toString() + ")";
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/WorkerThreadAggregatorUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/WorkerThreadAggregatorUsage.java b/giraph-core/src/main/java/org/apache/giraph/graph/WorkerThreadAggregatorUsage.java
deleted file mode 100644
index e36ff48..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/WorkerThreadAggregatorUsage.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-/**
- * {@link WorkerAggregatorUsage} which can be used in each of the
- * computation threads.
- */
-public interface WorkerThreadAggregatorUsage extends WorkerAggregatorUsage {
-  /**
-   * Call this after thread's computation is finished,
-   * i.e. when all vertices have provided their values to aggregators
-   */
-  void finishThreadComputation();
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java
deleted file mode 100644
index e063ac7..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.WorkerInfo;
-
-/**
- * Basic partition owner, can be subclassed for more complicated partition
- * owner implementations.
- */
-public class BasicPartitionOwner implements PartitionOwner,
-    ImmutableClassesGiraphConfigurable {
-  /** Configuration */
-  private ImmutableClassesGiraphConfiguration conf;
-  /** Partition id */
-  private int partitionId = -1;
-  /** Owning worker information */
-  private WorkerInfo workerInfo;
-  /** Previous (if any) worker info */
-  private WorkerInfo previousWorkerInfo;
-  /** Checkpoint files prefix for this partition */
-  private String checkpointFilesPrefix;
-
-  /**
-   * Default constructor.
-   */
-  public BasicPartitionOwner() { }
-
-  /**
-   * Constructor with partition id and worker info.
-   *
-   * @param partitionId Partition id of this partition.
-   * @param workerInfo Owner of the partition.
-   */
-  public BasicPartitionOwner(int partitionId, WorkerInfo workerInfo) {
-    this(partitionId, workerInfo, null, null);
-  }
-
-  /**
-   * Constructor with partition id and worker info.
-   *
-   * @param partitionId Partition id of this partition.
-   * @param workerInfo Owner of the partition.
-   * @param previousWorkerInfo Previous owner of this partition.
-   * @param checkpointFilesPrefix Prefix of the checkpoint files.
-   */
-  public BasicPartitionOwner(int partitionId,
-                             WorkerInfo workerInfo,
-                             WorkerInfo previousWorkerInfo,
-                             String checkpointFilesPrefix) {
-    this.partitionId = partitionId;
-    this.workerInfo = workerInfo;
-    this.previousWorkerInfo = previousWorkerInfo;
-    this.checkpointFilesPrefix = checkpointFilesPrefix;
-  }
-
-  @Override
-  public int getPartitionId() {
-    return partitionId;
-  }
-
-  @Override
-  public WorkerInfo getWorkerInfo() {
-    return workerInfo;
-  }
-
-  @Override
-  public void setWorkerInfo(WorkerInfo workerInfo) {
-    this.workerInfo = workerInfo;
-  }
-
-  @Override
-  public WorkerInfo getPreviousWorkerInfo() {
-    return previousWorkerInfo;
-  }
-
-  @Override
-  public void setPreviousWorkerInfo(WorkerInfo workerInfo) {
-    this.previousWorkerInfo = workerInfo;
-  }
-
-  @Override
-  public String getCheckpointFilesPrefix() {
-    return checkpointFilesPrefix;
-  }
-
-  @Override
-  public void setCheckpointFilesPrefix(String checkpointFilesPrefix) {
-    this.checkpointFilesPrefix = checkpointFilesPrefix;
-  }
-
-  @Override
-  public void readFields(DataInput input) throws IOException {
-    partitionId = input.readInt();
-    workerInfo = new WorkerInfo();
-    workerInfo.readFields(input);
-    boolean hasPreviousWorkerInfo = input.readBoolean();
-    if (hasPreviousWorkerInfo) {
-      previousWorkerInfo = new WorkerInfo();
-      previousWorkerInfo.readFields(input);
-    }
-    boolean hasCheckpointFilePrefix = input.readBoolean();
-    if (hasCheckpointFilePrefix) {
-      checkpointFilesPrefix = input.readUTF();
-    }
-  }
-
-  @Override
-  public void write(DataOutput output) throws IOException {
-    output.writeInt(partitionId);
-    workerInfo.write(output);
-    if (previousWorkerInfo != null) {
-      output.writeBoolean(true);
-      previousWorkerInfo.write(output);
-    } else {
-      output.writeBoolean(false);
-    }
-    if (checkpointFilesPrefix != null) {
-      output.writeBoolean(true);
-      output.writeUTF(checkpointFilesPrefix);
-    } else {
-      output.writeBoolean(false);
-    }
-  }
-
-  @Override
-  public ImmutableClassesGiraphConfiguration getConf() {
-    return conf;
-  }
-
-  @Override
-  public void setConf(ImmutableClassesGiraphConfiguration conf) {
-    this.conf = conf;
-  }
-
-  @Override
-  public String toString() {
-    return "(id=" + partitionId + ",cur=" + workerInfo + ",prev=" +
-        previousWorkerInfo + ",ckpt_file=" + checkpointFilesPrefix + ")";
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/ByteArrayPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/ByteArrayPartition.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/ByteArrayPartition.java
deleted file mode 100644
index 5460a55..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/ByteArrayPartition.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.graph.partition;
-
-import com.google.common.collect.MapMaker;
-
-import com.google.common.primitives.Ints;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.utils.UnsafeByteArrayInputStream;
-import org.apache.giraph.utils.WritableUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.util.Progressable;
-import org.apache.log4j.Logger;
-
-/**
- * Byte array based partition.  Should reduce the amount of memory used since
- * the entire graph is compressed into byte arrays.  Must guarantee, however,
- * that only one thread at a time will call getVertex since it is a singleton.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public class ByteArrayPartition<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    implements Partition<I, V, E, M> {
-  /** Class logger */
-  private static final Logger LOG = Logger.getLogger(ByteArrayPartition.class);
-  /** Configuration from the worker */
-  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
-  /** Partition id */
-  private int id;
-  /**
-   * Vertex map for this range (keyed by index).  Note that the byte[] is a
-   * serialized vertex with the first four bytes as the length of the vertex
-   * to read.
-   */
-  private ConcurrentMap<I, byte[]> vertexMap;
-  /** Context used to report progress */
-  private Progressable progressable;
-  /** Representative vertex */
-  private Vertex<I, V, E, M> representativeVertex;
-  /** Use unsafe serialization */
-  private boolean useUnsafeSerialization;
-
-  /**
-   * Constructor for reflection.
-   */
-  public ByteArrayPartition() { }
-
-  @Override
-  public void initialize(int partitionId, Progressable progressable) {
-    setId(partitionId);
-    setProgressable(progressable);
-    vertexMap = new MapMaker().concurrencyLevel(
-        conf.getNettyServerExecutionConcurrency()).makeMap();
-    representativeVertex = conf.createVertex();
-    useUnsafeSerialization = conf.useUnsafeSerialization();
-  }
-
-  @Override
-  public Vertex<I, V, E, M> getVertex(I vertexIndex) {
-    byte[] vertexData = vertexMap.get(vertexIndex);
-    if (vertexData == null) {
-      return null;
-    }
-    WritableUtils.readFieldsFromByteArrayWithSize(
-        vertexData, representativeVertex, useUnsafeSerialization);
-    return representativeVertex;
-  }
-
-  @Override
-  public Vertex<I, V, E, M> putVertex(Vertex<I, V, E, M> vertex) {
-    byte[] vertexData =
-        WritableUtils.writeToByteArrayWithSize(vertex, useUnsafeSerialization);
-    byte[] oldVertexBytes = vertexMap.put(vertex.getId(), vertexData);
-    if (oldVertexBytes == null) {
-      return null;
-    } else {
-      WritableUtils.readFieldsFromByteArrayWithSize(
-          oldVertexBytes, representativeVertex, useUnsafeSerialization);
-      return representativeVertex;
-    }
-  }
-
-  @Override
-  public Vertex<I, V, E, M> removeVertex(I vertexIndex) {
-    byte[] vertexBytes = vertexMap.remove(vertexIndex);
-    if (vertexBytes == null) {
-      return null;
-    }
-    WritableUtils.readFieldsFromByteArrayWithSize(vertexBytes,
-        representativeVertex, useUnsafeSerialization);
-    return representativeVertex;
-  }
-
-  @Override
-  public void addPartition(Partition<I, V, E, M> partition) {
-    // Only work with other ByteArrayPartition instances
-    if (!(partition instanceof ByteArrayPartition)) {
-      throw new IllegalStateException("addPartition: Cannot add partition " +
-          "of type " + partition.getClass());
-    }
-
-    ByteArrayPartition<I, V, E, M> byteArrayPartition =
-        (ByteArrayPartition<I, V, E, M>) partition;
-    for (Map.Entry<I, byte[]> entry :
-        byteArrayPartition.vertexMap.entrySet()) {
-      vertexMap.put(entry.getKey(), entry.getValue());
-    }
-  }
-
-  @Override
-  public long getVertexCount() {
-    return vertexMap.size();
-  }
-
-  @Override
-  public long getEdgeCount() {
-    long edges = 0;
-    for (byte[] vertexBytes : vertexMap.values()) {
-      WritableUtils.readFieldsFromByteArrayWithSize(vertexBytes,
-          representativeVertex, useUnsafeSerialization);
-      edges += representativeVertex.getNumEdges();
-    }
-    return edges;
-  }
-
-  @Override
-  public int getId() {
-    return id;
-  }
-
-  @Override
-  public void setId(int id) {
-    this.id = id;
-  }
-
-  @Override
-  public void setProgressable(Progressable progressable) {
-    this.progressable = progressable;
-  }
-
-  @Override
-  public void saveVertex(Vertex<I, V, E, M> vertex) {
-    // Reuse the old buffer whenever possible
-    byte[] oldVertexData = vertexMap.get(vertex.getId());
-    if (oldVertexData != null) {
-      vertexMap.put(vertex.getId(),
-          WritableUtils.writeToByteArrayWithSize(
-              vertex, oldVertexData, useUnsafeSerialization));
-    } else {
-      vertexMap.put(vertex.getId(),
-          WritableUtils.writeToByteArrayWithSize(
-              vertex, useUnsafeSerialization));
-    }
-  }
-
-  @Override
-  public void write(DataOutput output) throws IOException {
-    output.writeInt(id);
-    output.writeInt(vertexMap.size());
-    for (Map.Entry<I, byte[]> entry : vertexMap.entrySet()) {
-      if (progressable != null) {
-        progressable.progress();
-      }
-      entry.getKey().write(output);
-      // Note here that we are writing the size of the vertex data first
-      // as it is encoded in the first four bytes of the byte[]
-      int vertexDataSize;
-      if (useUnsafeSerialization) {
-        vertexDataSize = UnsafeByteArrayInputStream.getInt(entry.getValue(),
-            0);
-      } else {
-        vertexDataSize = Ints.fromByteArray(entry.getValue());
-      }
-
-      output.writeInt(vertexDataSize);
-      output.write(entry.getValue(), 0, vertexDataSize);
-    }
-  }
-
-  @Override
-  public void readFields(DataInput input) throws IOException {
-    id = input.readInt();
-    int size = input.readInt();
-    vertexMap = new MapMaker().concurrencyLevel(
-        conf.getNettyServerExecutionConcurrency()).initialCapacity(
-        size).makeMap();
-    representativeVertex = conf.createVertex();
-    useUnsafeSerialization = conf.useUnsafeSerialization();
-    for (int i = 0; i < size; ++i) {
-      if (progressable != null) {
-        progressable.progress();
-      }
-      I vertexId = conf.createVertexId();
-      vertexId.readFields(input);
-      int vertexDataSize = input.readInt();
-      byte[] vertexData = new byte[vertexDataSize];
-      input.readFully(vertexData);
-      if (vertexMap.put(vertexId, vertexData) != null) {
-        throw new IllegalStateException("readFields: Already saw vertex " +
-            vertexId);
-      }
-    }
-  }
-
-  @Override
-  public void setConf(
-      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration) {
-    conf = configuration;
-  }
-
-  @Override
-  public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
-    return conf;
-  }
-
-  @Override
-  public Iterator<Vertex<I, V, E, M>> iterator() {
-    return new RepresentativeVertexIterator();
-  }
-
-  /**
-   * Iterator that deserializes a vertex from a byte array on the fly, using
-   * the same representative vertex object.
-   */
-  private class RepresentativeVertexIterator implements
-      Iterator<Vertex<I, V, E, M>> {
-    /** Iterator to the vertex values */
-    private Iterator<byte[]> vertexDataIterator =
-        vertexMap.values().iterator();
-
-    @Override
-    public boolean hasNext() {
-      return vertexDataIterator.hasNext();
-    }
-
-    @Override
-    public Vertex<I, V, E, M> next() {
-      WritableUtils.readFieldsFromByteArrayWithSize(
-          vertexDataIterator.next(), representativeVertex,
-          useUnsafeSerialization);
-      return representativeVertex;
-    }
-
-    @Override
-    public void remove() {
-      throw new IllegalAccessError("remove: This method is not supported.");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java
deleted file mode 100644
index 1e483d3..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * A partition store that can possibly spill to disk.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-public class DiskBackedPartitionStore<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends PartitionStore<I, V, E, M> {
-  /** Class logger. */
-  private static final Logger LOG =
-      Logger.getLogger(DiskBackedPartitionStore.class);
-  /** Map of partitions kept in memory. */
-  private final ConcurrentMap<Integer, Partition<I, V, E, M>>
-  inMemoryPartitions = new ConcurrentHashMap<Integer, Partition<I, V, E, M>>();
-  /** Maximum number of partitions to keep in memory. */
-  private int maxInMemoryPartitions;
-  /** Map of partitions kept out-of-core. The values are partition sizes. */
-  private final ConcurrentMap<Integer, Integer> onDiskPartitions =
-      Maps.newConcurrentMap();
-  /** Directory on the local file system for storing out-of-core partitions. */
-  private final String basePath;
-  /** Configuration. */
-  private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
-  /** Slot for loading out-of-core partitions. */
-  private Partition<I, V, E, M> loadedPartition;
-  /** Locks for accessing and modifying partitions. */
-  private final ConcurrentMap<Integer, Lock> partitionLocks =
-      Maps.newConcurrentMap();
-  /** Context used to report progress */
-  private final Mapper<?, ?, ?, ?>.Context context;
-
-  /**
-   * Constructor.
-   *
-   * @param conf Configuration
-   * @param context Mapper context
-   */
-  public DiskBackedPartitionStore(
-      ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
-      Mapper<?, ?, ?, ?>.Context context) {
-    this.conf = conf;
-    this.context = context;
-    // We must be able to hold at least one partition in memory
-    maxInMemoryPartitions = Math.max(1,
-        conf.getInt(GiraphConstants.MAX_PARTITIONS_IN_MEMORY,
-            GiraphConstants.MAX_PARTITIONS_IN_MEMORY_DEFAULT));
-    basePath = conf.get("mapred.job.id", "Unknown Job") +
-        conf.get(GiraphConstants.PARTITIONS_DIRECTORY,
-            GiraphConstants.PARTITIONS_DIRECTORY_DEFAULT);
-  }
-
-  /**
-   * Get the path to the file where a partition is stored.
-   *
-   * @param partitionId The partition
-   * @return The path to the given partition
-   */
-  private String getPartitionPath(Integer partitionId) {
-    return basePath + "/partition-" + partitionId;
-  }
-
-  /**
-   * Create a new lock for a partition, lock it, and return it. If already
-   * existing, return null.
-   *
-   * @param partitionId Partition id
-   * @return A newly created lock, or null if already present
-   */
-  private Lock createLock(Integer partitionId) {
-    Lock lock = new ReentrantLock(true);
-    lock.lock();
-    if (partitionLocks.putIfAbsent(partitionId, lock) != null) {
-      return null;
-    }
-    return lock;
-  }
-
-  /**
-   * Get the lock for a partition id.
-   *
-   * @param partitionId Partition id
-   * @return The lock
-   */
-  private Lock getLock(Integer partitionId) {
-    return partitionLocks.get(partitionId);
-  }
-
-  /**
-   * Write a partition to disk.
-   *
-   * @param partition The partition object to write
-   * @throws java.io.IOException
-   */
-  private void writePartition(Partition<I, V, E, M> partition)
-    throws IOException {
-    File file = new File(getPartitionPath(partition.getId()));
-    file.getParentFile().mkdirs();
-    file.createNewFile();
-    DataOutputStream outputStream = new DataOutputStream(
-        new BufferedOutputStream(new FileOutputStream(file)));
-    for (Vertex<I, V, E, M> vertex : partition) {
-      vertex.write(outputStream);
-    }
-    outputStream.close();
-  }
-
-  /**
-   * Read a partition from disk.
-   *
-   * @param partitionId Id of the partition to read
-   * @return The partition object
-   * @throws IOException
-   */
-  private Partition<I, V, E, M> readPartition(Integer partitionId)
-    throws IOException {
-    Partition<I, V, E, M> partition =
-        conf.createPartition(partitionId, context);
-    File file = new File(getPartitionPath(partitionId));
-    DataInputStream inputStream = new DataInputStream(
-        new BufferedInputStream(new FileInputStream(file)));
-    int numVertices = onDiskPartitions.get(partitionId);
-    for (int i = 0; i < numVertices; ++i) {
-      Vertex<I, V, E, M> vertex = conf.createVertex();
-      vertex.readFields(inputStream);
-      partition.putVertex(vertex);
-    }
-    inputStream.close();
-    file.delete();
-    return partition;
-  }
-
-  /**
-   * Append some vertices of another partition to an out-of-core partition.
-   *
-   * @param partition Partition to add
-   * @throws IOException
-   */
-  private void appendPartitionOutOfCore(Partition<I, V, E, M> partition)
-    throws IOException {
-    File file = new File(getPartitionPath(partition.getId()));
-    DataOutputStream outputStream = new DataOutputStream(
-        new BufferedOutputStream(new FileOutputStream(file, true)));
-    for (Vertex<I, V, E, M> vertex : partition) {
-      vertex.write(outputStream);
-    }
-    outputStream.close();
-  }
-
-  /**
-   * Load an out-of-core partition in memory.
-   *
-   * @param partitionId Partition id
-   */
-  private void loadPartition(Integer partitionId) {
-    if (loadedPartition != null) {
-      if (loadedPartition.getId() == partitionId) {
-        return;
-      }
-      if (LOG.isInfoEnabled()) {
-        LOG.info("loadPartition: moving partition " + loadedPartition.getId() +
-            " out of core with size " + loadedPartition.getVertexCount());
-      }
-      try {
-        writePartition(loadedPartition);
-        onDiskPartitions.put(loadedPartition.getId(),
-            (int) loadedPartition.getVertexCount());
-        loadedPartition = null;
-      } catch (IOException e) {
-        throw new IllegalStateException("loadPartition: failed writing " +
-            "partition " + loadedPartition.getId() + " to disk", e);
-      }
-    }
-    if (LOG.isInfoEnabled()) {
-      LOG.info("loadPartition: loading partition " + partitionId +
-          " in memory");
-    }
-    try {
-      loadedPartition = readPartition(partitionId);
-    } catch (IOException e) {
-      throw new IllegalStateException("loadPartition: failed reading " +
-          "partition " + partitionId + " from disk");
-    }
-  }
-
-  /**
-   * Add a new partition without requiring a lock.
-   *
-   * @param partition Partition to be added
-   */
-  private void addPartitionNoLock(Partition<I, V, E, M> partition) {
-    synchronized (inMemoryPartitions) {
-      if (inMemoryPartitions.size() + 1 < maxInMemoryPartitions) {
-        inMemoryPartitions.put(partition.getId(), partition);
-
-        return;
-      }
-    }
-    try {
-      writePartition(partition);
-      onDiskPartitions.put(partition.getId(),
-          (int) partition.getVertexCount());
-    } catch (IOException e) {
-      throw new IllegalStateException("addPartition: failed writing " +
-          "partition " + partition.getId() + "to disk");
-    }
-  }
-
-  @Override
-  public void addPartition(Partition<I, V, E, M> partition) {
-    if (inMemoryPartitions.containsKey(partition.getId())) {
-      Partition<I, V, E, M> existingPartition =
-          inMemoryPartitions.get(partition.getId());
-      existingPartition.addPartition(partition);
-    } else if (onDiskPartitions.containsKey(partition.getId())) {
-      Lock lock = getLock(partition.getId());
-      lock.lock();
-      if (loadedPartition != null && loadedPartition.getId() ==
-          partition.getId()) {
-        loadedPartition.addPartition(partition);
-      } else {
-        try {
-          appendPartitionOutOfCore(partition);
-          onDiskPartitions.put(partition.getId(),
-              onDiskPartitions.get(partition.getId()) +
-                  (int) partition.getVertexCount());
-        } catch (IOException e) {
-          throw new IllegalStateException("addPartition: failed " +
-              "writing vertices to partition " + partition.getId() + " on disk",
-              e);
-        }
-      }
-      lock.unlock();
-    } else {
-      Lock lock = createLock(partition.getId());
-      if (lock != null) {
-        addPartitionNoLock(partition);
-        lock.unlock();
-      } else {
-        // Another thread is already creating the partition,
-        // so we make sure it's done before repeating the call.
-        lock = getLock(partition.getId());
-        lock.lock();
-        lock.unlock();
-        addPartition(partition);
-      }
-    }
-  }
-
-  @Override
-  public Partition<I, V, E, M> getPartition(Integer partitionId) {
-    if (inMemoryPartitions.containsKey(partitionId)) {
-      return inMemoryPartitions.get(partitionId);
-    } else if (onDiskPartitions.containsKey(partitionId)) {
-      loadPartition(partitionId);
-      return loadedPartition;
-    } else {
-      throw new IllegalStateException("getPartition: partition " +
-          partitionId + " does not exist");
-    }
-  }
-
-  @Override
-  public Partition<I, V, E, M> removePartition(Integer partitionId) {
-    partitionLocks.remove(partitionId);
-    if (onDiskPartitions.containsKey(partitionId)) {
-      Partition<I, V, E, M> partition;
-      if (loadedPartition != null && loadedPartition.getId() == partitionId) {
-        partition = loadedPartition;
-        loadedPartition = null;
-      } else {
-        try {
-          partition = readPartition(partitionId);
-        } catch (IOException e) {
-          throw new IllegalStateException("removePartition: failed reading " +
-              "partition " + partitionId + " from disk", e);
-        }
-      }
-      onDiskPartitions.remove(partitionId);
-      return partition;
-    } else {
-      return inMemoryPartitions.remove(partitionId);
-    }
-  }
-
-  @Override
-  public void deletePartition(Integer partitionId) {
-    partitionLocks.remove(partitionId);
-    if (inMemoryPartitions.containsKey(partitionId)) {
-      inMemoryPartitions.remove(partitionId);
-    } else {
-      if (loadedPartition != null && loadedPartition.getId() == partitionId) {
-        loadedPartition = null;
-      } else {
-        File file = new File(getPartitionPath(partitionId));
-        file.delete();
-      }
-      onDiskPartitions.remove(partitionId);
-    }
-  }
-
-  @Override
-  public boolean hasPartition(Integer partitionId) {
-    return partitionLocks.containsKey(partitionId);
-  }
-
-  @Override
-  public Iterable<Integer> getPartitionIds() {
-    return Iterables.concat(inMemoryPartitions.keySet(),
-        onDiskPartitions.keySet());
-  }
-
-  @Override
-  public int getNumPartitions() {
-    return partitionLocks.size();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java
deleted file mode 100644
index a7ac84b..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Defines the partitioning framework for this application.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message value
- */
-@SuppressWarnings("rawtypes")
-public interface GraphPartitionerFactory<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> extends
-    ImmutableClassesGiraphConfigurable {
-  /**
-   * Create the {@link MasterGraphPartitioner} used by the master.
-   * Instantiated once by the master and reused.
-   *
-   * @return Instantiated master graph partitioner
-   */
-  MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner();
-
-  /**
-   * Create the {@link WorkerGraphPartitioner} used by the worker.
-   * Instantiated once by every worker and reused.
-   *
-   * @return Instantiated worker graph partitioner
-   */
-  WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner();
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java
deleted file mode 100644
index 6724e50..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.WorkerInfo;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-/**
- * Master will execute a hash based partitioning.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message value
- */
-@SuppressWarnings("rawtypes")
-public class HashMasterPartitioner<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> implements
-    MasterGraphPartitioner<I, V, E, M> {
-  /** Multiplier for the current workers squared */
-  public static final String PARTITION_COUNT_MULTIPLIER =
-    "hash.masterPartitionCountMultipler";
-  /** Default mulitplier for current workers squared */
-  public static final float DEFAULT_PARTITION_COUNT_MULTIPLIER = 1.0f;
-  /** Overrides default partition count calculation if not -1 */
-  public static final String USER_PARTITION_COUNT =
-    "hash.userPartitionCount";
-  /** Default user partition count */
-  public static final int DEFAULT_USER_PARTITION_COUNT = -1;
-  /** Class logger */
-  private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class);
-  /**
-   * ZooKeeper has a limit of the data in a single znode of 1 MB and
-   * each entry can go be on the average somewhat more than 300 bytes
-   */
-  private static final int MAX_PARTTIONS = 1024 * 1024 / 350;
-  /** Provided configuration */
-  private ImmutableClassesGiraphConfiguration conf;
-  /** Specified partition count (overrides calculation) */
-  private final int userPartitionCount;
-  /** Partition count (calculated in createInitialPartitionOwners) */
-  private int partitionCount = -1;
-  /** Save the last generated partition owner list */
-  private List<PartitionOwner> partitionOwnerList;
-
-  /**
-   * Constructor.
-   *
-   *@param conf Configuration used.
-   */
-  public HashMasterPartitioner(ImmutableClassesGiraphConfiguration conf) {
-    this.conf = conf;
-    userPartitionCount = conf.getInt(USER_PARTITION_COUNT,
-        DEFAULT_USER_PARTITION_COUNT);
-  }
-
-  @Override
-  public Collection<PartitionOwner> createInitialPartitionOwners(
-      Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
-    if (availableWorkerInfos.isEmpty()) {
-      throw new IllegalArgumentException(
-          "createInitialPartitionOwners: No available workers");
-    }
-    List<PartitionOwner> ownerList = new ArrayList<PartitionOwner>();
-    Iterator<WorkerInfo> workerIt = availableWorkerInfos.iterator();
-    if (userPartitionCount == DEFAULT_USER_PARTITION_COUNT) {
-      float multiplier = conf.getFloat(
-          PARTITION_COUNT_MULTIPLIER,
-          DEFAULT_PARTITION_COUNT_MULTIPLIER);
-      partitionCount =
-          Math.max((int) (multiplier * availableWorkerInfos.size() *
-              availableWorkerInfos.size()),
-              1);
-    } else {
-      partitionCount = userPartitionCount;
-    }
-    if (LOG.isInfoEnabled()) {
-      LOG.info("createInitialPartitionOwners: Creating " +
-        partitionCount + ", default would have been " +
-        (availableWorkerInfos.size() *
-         availableWorkerInfos.size()) + " partitions.");
-    }
-    if (partitionCount > MAX_PARTTIONS) {
-      LOG.warn("createInitialPartitionOwners: " +
-          "Reducing the partitionCount to " + MAX_PARTTIONS +
-          " from " + partitionCount);
-      partitionCount = MAX_PARTTIONS;
-    }
-
-    for (int i = 0; i < partitionCount; ++i) {
-      PartitionOwner owner = new BasicPartitionOwner(i, workerIt.next());
-      if (!workerIt.hasNext()) {
-        workerIt = availableWorkerInfos.iterator();
-      }
-      ownerList.add(owner);
-    }
-    this.partitionOwnerList = ownerList;
-    return ownerList;
-  }
-
-  @Override
-  public Collection<PartitionOwner> getCurrentPartitionOwners() {
-    return partitionOwnerList;
-  }
-
-  /**
-   * Subclasses can set the partition owner list.
-   *
-   * @param partitionOwnerList New partition owner list.
-   */
-  protected void setPartitionOwnerList(List<PartitionOwner>
-  partitionOwnerList) {
-    this.partitionOwnerList = partitionOwnerList;
-  }
-
-  @Override
-  public Collection<PartitionOwner> generateChangedPartitionOwners(
-      Collection<PartitionStats> allPartitionStatsList,
-      Collection<WorkerInfo> availableWorkerInfos,
-      int maxWorkers,
-      long superstep) {
-    return PartitionBalancer.balancePartitionsAcrossWorkers(
-        conf,
-        partitionOwnerList,
-        allPartitionStatsList,
-        availableWorkerInfos);
-  }
-
-  @Override
-  public PartitionStats createPartitionStats() {
-    return new PartitionStats();
-  }
-}