You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/01/04 21:52:39 UTC

[13/23] GIRAPH-409: Refactor / cleanups (nitay)

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java
deleted file mode 100644
index 3b047f3..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.io;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import com.google.common.collect.Lists;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * VertexReader that readers lines of text with vertices encoded as adjacency
- * lists and converts each token to the correct type.  For example, a graph
- * with vertices as integers and values as doubles could be encoded as:
- *   1 0.1 2 0.2 3 0.3
- * to represent a vertex named 1, with 0.1 as its value and two edges, to
- * vertices 2 and 3, with edge values of 0.2 and 0.3, respectively.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public abstract class AdjacencyListTextVertexInputFormat<I extends
-    WritableComparable, V extends Writable, E extends Writable, M extends
-    Writable> extends TextVertexInputFormat<I, V, E, M> {
-  /** Delimiter for split */
-  public static final String LINE_TOKENIZE_VALUE = "adj.list.input.delimiter";
-  /** Default delimiter for split */
-  public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
-
-  /**
-   * Utility for doing any cleaning of each line before it is tokenized.
-   */
-  public interface LineSanitizer {
-    /**
-     * Clean string s before attempting to tokenize it.
-     *
-     * @param s String to be cleaned.
-     * @return Sanitized string.
-     */
-    String sanitize(String s);
-  }
-
-  @Override
-  public abstract AdjacencyListTextVertexReader createVertexReader(
-      InputSplit split, TaskAttemptContext context);
-
-  /**
-   * Vertex reader associated with {@link AdjacencyListTextVertexInputFormat}.
-   */
-  protected abstract class AdjacencyListTextVertexReader extends
-    TextVertexReaderFromEachLineProcessed<String[]> {
-    /**
-     * Cached configuration.
-     */
-    private Configuration conf;
-
-    /** Cached delimiter used for split */
-    private String splitValue = null;
-
-    /**
-     * Sanitizer from constructor.
-     */
-    private final LineSanitizer sanitizer;
-
-
-    /**
-     * Constructor without line sanitizer.
-     *
-     */
-    public AdjacencyListTextVertexReader() {
-      this(null);
-    }
-
-    /**
-     * Constructor with line sanitizer.
-     *
-     * @param sanitizer Sanitizer to be used.
-     */
-    public AdjacencyListTextVertexReader(LineSanitizer sanitizer) {
-      this.sanitizer = sanitizer;
-    }
-
-    @Override
-    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
-      throws IOException, InterruptedException {
-      super.initialize(inputSplit, context);
-      conf = context.getConfiguration();
-      splitValue = conf.get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
-    }
-
-    @Override
-    protected String[] preprocessLine(Text line) throws IOException {
-      String sanitizedLine;
-      if (sanitizer != null) {
-        sanitizedLine = sanitizer.sanitize(line.toString());
-      } else {
-        sanitizedLine = line.toString();
-      }
-      String [] values = sanitizedLine.split(splitValue);
-      if ((values.length < 2) || (values.length % 2 != 0)) {
-        throw new IllegalArgumentException(
-          "Line did not split correctly: " + line);
-      }
-      return values;
-    }
-
-    @Override
-    protected I getId(String[] values) throws IOException {
-      return decodeId(values[0]);
-    }
-
-    /**
-     * Decode the id for this line into an instance of its correct type.
-     *
-     * @param s Id of vertex from line
-     * @return Vertex id
-     */
-    public abstract I decodeId(String s);
-
-    @Override
-    protected V getValue(String[] values) throws IOException {
-      return decodeValue(values[1]);
-    }
-
-
-    /**
-     * Decode the value for this line into an instance of its correct type.
-     *
-     * @param s Value from line
-     * @return Vertex value
-     */
-    public abstract V decodeValue(String s);
-
-    @Override
-    protected Iterable<Edge<I, E>> getEdges(String[] values) throws
-        IOException {
-      int i = 2;
-      List<Edge<I, E>> edges = Lists.newLinkedList();
-      while (i < values.length) {
-        edges.add(decodeEdge(values[i], values[i + 1]));
-        i += 2;
-      }
-      return edges;
-    }
-
-    /**
-     * Decode an edge from the line into an instance of a correctly typed Edge
-     *
-     * @param id The edge's id from the line
-     * @param value The edge's value from the line
-     * @return Edge with given target id and value
-     */
-    public abstract Edge<I, E> decodeEdge(String id, String value);
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java
deleted file mode 100644
index 77abb85..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.io;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-
-/**
- * OutputFormat to write out the graph nodes as text, value-separated (by
- * tabs, by default).  With the default delimiter, a vertex is written out as:
- *
- * <VertexId><tab><Vertex Value><tab>[<EdgeId><tab><EdgeValue>]+
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public class AdjacencyListTextVertexOutputFormat<I extends WritableComparable,
-    V extends Writable, E extends Writable>
-    extends TextVertexOutputFormat<I, V, E> {
-
-  /** Split delimiter */
-  public static final String LINE_TOKENIZE_VALUE = "output.delimiter";
-  /** Default split delimiter */
-  public static final String LINE_TOKENIZE_VALUE_DEFAULT =
-    AdjacencyListTextVertexInputFormat.LINE_TOKENIZE_VALUE_DEFAULT;
-
-  @Override
-  public AdjacencyListTextVertexWriter createVertexWriter(
-      TaskAttemptContext context) {
-    return new AdjacencyListTextVertexWriter();
-  }
-
-  /**
-   * Vertex writer associated with {@link AdjacencyListTextVertexOutputFormat}.
-   */
-  protected class AdjacencyListTextVertexWriter extends
-    TextVertexWriterToEachLine {
-    /** Cached split delimeter */
-    private String delimiter;
-
-    @Override
-    public void initialize(TaskAttemptContext context) throws IOException,
-        InterruptedException {
-      super.initialize(context);
-      delimiter = context.getConfiguration()
-          .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
-    }
-
-    @Override
-    public Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
-      throws IOException {
-      StringBuffer sb = new StringBuffer(vertex.getId().toString());
-      sb.append(delimiter);
-      sb.append(vertex.getValue());
-
-      for (Edge<I, E> edge : vertex.getEdges()) {
-        sb.append(delimiter).append(edge.getTargetVertexId());
-        sb.append(delimiter).append(edge.getValue());
-      }
-
-      return new Text(sb.toString());
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/BasicVertexValueReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/BasicVertexValueReader.java b/giraph-core/src/main/java/org/apache/giraph/io/BasicVertexValueReader.java
new file mode 100644
index 0000000..1ccde39
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/BasicVertexValueReader.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * Abstract base class for VertexValueReader.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public abstract class BasicVertexValueReader<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements VertexReader<I, V, E, M> {
+  /**
+   * User-defined method to extract the vertex id.
+   *
+   * @return The vertex id
+   * @throws java.io.IOException
+   * @throws InterruptedException
+   */
+  public abstract I getCurrentVertexId() throws IOException,
+      InterruptedException;
+
+  /**
+   * User-defined method to extract the vertex value.
+   *
+   * @return The vertex value
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract V getCurrentVertexValue() throws IOException,
+      InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java
new file mode 100644
index 0000000..0d5c43f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Input format for reading single edges.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ */
+public abstract class EdgeInputFormat<I extends WritableComparable,
+    E extends Writable> implements GiraphInputFormat {
+  /**
+   * Logically split the vertices for a graph processing application.
+   *
+   * Each {@link InputSplit} is then assigned to a worker for processing.
+   *
+   * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
+   * input files are not physically split into chunks. For e.g. a split could
+   * be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat
+   * also creates the {@link VertexReader} to read the {@link InputSplit}.
+   *
+   * Also, the number of workers is a hint given to the developer to try to
+   * intelligently determine how many splits to create (if this is
+   * adjustable) at runtime.
+   *
+   * @param context Context of the job
+   * @param numWorkers Number of workers used for this job
+   * @return an array of {@link InputSplit}s for the job.
+   */
+  @Override
+  public abstract List<InputSplit> getSplits(
+      JobContext context, int numWorkers) throws IOException,
+      InterruptedException;
+
+  /**
+   * Create an edge reader for a given split. The framework will call
+   * {@link EdgeReader#initialize(InputSplit, TaskAttemptContext)} before
+   * the split is used.
+   *
+   * @param split the split to be read
+   * @param context the information about the task
+   * @return a new record reader
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract EdgeReader<I, E> createEdgeReader(
+      InputSplit split,
+      TaskAttemptContext context) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
new file mode 100644
index 0000000..9642fab
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.graph.EdgeWithSource;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Analogous to {@link RecordReader} for edges.  Will read the edges
+ * from an input split.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ */
+@SuppressWarnings("rawtypes")
+public interface EdgeReader<I extends WritableComparable, E extends Writable> {
+  /**
+   * Use the input split and context to setup reading the edges.
+   * Guaranteed to be called prior to any other function.
+   *
+   * @param inputSplit Input split to be used for reading edges.
+   * @param context Context from the task.
+   * @throws java.io.IOException
+   * @throws InterruptedException
+   */
+  void initialize(InputSplit inputSplit, TaskAttemptContext context) throws
+      IOException, InterruptedException;
+
+  /**
+   *
+   * @return false iff there are no more edges
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  boolean nextEdge() throws IOException, InterruptedException;
+
+  /**
+   * Get the current edge.
+   *
+   * @return the current edge which has been read.
+   *         nextEdge() should be called first.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  EdgeWithSource<I, E> getCurrentEdge() throws IOException,
+      InterruptedException;
+
+  /**
+   * Close this {@link EdgeReader} to future operations.
+   *
+   * @throws IOException
+   */
+  void close() throws IOException;
+
+  /**
+   * How much of the input has the {@link EdgeReader} consumed i.e.
+   * has been processed by?
+   *
+   * @return Progress from <code>0.0</code> to <code>1.0</code>.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  float getProgress() throws IOException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/GeneratedVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/GeneratedVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/GeneratedVertexInputFormat.java
deleted file mode 100644
index ad4f2bf..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/GeneratedVertexInputFormat.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-import org.apache.giraph.bsp.BspInputSplit;
-import org.apache.giraph.graph.VertexInputFormat;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * This VertexInputFormat is meant for testing/debugging.  It simply generates
- * some vertex data that can be consumed by test applications.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public abstract class GeneratedVertexInputFormat<
-    I extends WritableComparable, V extends Writable, E extends Writable,
-    M extends Writable> extends VertexInputFormat<I, V, E, M> {
-  @Override
-  public List<InputSplit> getSplits(JobContext context, int numWorkers)
-    throws IOException, InterruptedException {
-    // This is meaningless, the VertexReader will generate all the test
-    // data.
-    List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
-    for (int i = 0; i < numWorkers; ++i) {
-      inputSplitList.add(new BspInputSplit(i, numWorkers));
-    }
-    return inputSplitList;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java
deleted file mode 100644
index 114e75f..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/*if[HADOOP_NON_SECURE]
-else[HADOOP_NON_SECURE]
-import org.apache.hadoop.mapreduce.security.TokenCache;
-end[HADOOP_NON_SECURE]*/
-
-/**
- * Provides functionality similar to {@link FileInputFormat},
- * but allows for different data sources (vertex and edge data).
- *
- * @param <K> Key
- * @param <V> Value
- */
-public abstract class GiraphFileInputFormat<K, V>
-    extends FileInputFormat<K, V> {
-  /** Vertex input file paths. */
-  public static final String VERTEX_INPUT_DIR = "giraph.vertex.input.dir";
-  /** Edge input file paths. */
-  public static final String EDGE_INPUT_DIR = "giraph.edge.input.dir";
-  /** Number of vertex input files. */
-  public static final String NUM_VERTEX_INPUT_FILES =
-      "giraph.input.vertex.num.files";
-  /** Number of edge input files. */
-  public static final String NUM_EDGE_INPUT_FILES =
-      "giraph.input.edge.num.files";
-
-  /** Split slop. */
-  private static final double SPLIT_SLOP = 1.1; // 10% slop
-
-  /** Filter for hidden files. */
-  private static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() {
-    public boolean accept(Path p) {
-      String name = p.getName();
-      return !name.startsWith("_") && !name.startsWith(".");
-    }
-  };
-
-  /** Class logger. */
-  private static final Logger LOG =
-      Logger.getLogger(GiraphFileInputFormat.class);
-
-  /**
-   * Add a {@link org.apache.hadoop.fs.Path} to the list of vertex inputs.
-   *
-   * @param job The {@link org.apache.hadoop.mapreduce.Job} to modify
-   * @param path {@link org.apache.hadoop.fs.Path} to be added to the list of
-   *                                              vertex inputs
-   */
-  public static void addVertexInputPath(Job job, Path path) throws IOException {
-    Configuration conf = job.getConfiguration();
-    path = path.getFileSystem(conf).makeQualified(path);
-    String dirStr = StringUtils.escapeString(path.toString());
-    String dirs = conf.get(VERTEX_INPUT_DIR);
-    conf.set(VERTEX_INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
-  }
-
-  /**
-   * Add a {@link org.apache.hadoop.fs.Path} to the list of edge inputs.
-   *
-   * @param job The {@link org.apache.hadoop.mapreduce.Job} to modify
-   * @param path {@link org.apache.hadoop.fs.Path} to be added to the list of
-   *                                              edge inputs
-   */
-  public static void addEdgeInputPath(Job job, Path path) throws IOException {
-    Configuration conf = job.getConfiguration();
-    path = path.getFileSystem(conf).makeQualified(path);
-    String dirStr = StringUtils.escapeString(path.toString());
-    String dirs = conf.get(EDGE_INPUT_DIR);
-    conf.set(EDGE_INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
-  }
-
-  /**
-   * Get the list of vertex input {@link Path}s.
-   *
-   * @param context The job
-   * @return The list of input {@link Path}s
-   */
-  public static Path[] getVertexInputPaths(JobContext context) {
-    String dirs = context.getConfiguration().get(VERTEX_INPUT_DIR, "");
-    String [] list = StringUtils.split(dirs);
-    Path[] result = new Path[list.length];
-    for (int i = 0; i < list.length; i++) {
-      result[i] = new Path(StringUtils.unEscapeString(list[i]));
-    }
-    return result;
-  }
-
-  /**
-   * Get the list of edge input {@link Path}s.
-   *
-   * @param context The job
-   * @return The list of input {@link Path}s
-   */
-  public static Path[] getEdgeInputPaths(JobContext context) {
-    String dirs = context.getConfiguration().get(EDGE_INPUT_DIR, "");
-    String [] list = StringUtils.split(dirs);
-    Path[] result = new Path[list.length];
-    for (int i = 0; i < list.length; i++) {
-      result[i] = new Path(StringUtils.unEscapeString(list[i]));
-    }
-    return result;
-  }
-
-  /**
-   * Proxy PathFilter that accepts a path only if all filters given in the
-   * constructor do. Used by the listPaths() to apply the built-in
-   * HIDDEN_FILE_FILTER together with a user provided one (if any).
-   */
-  private static class MultiPathFilter implements PathFilter {
-    /** List of filters. */
-    private List<PathFilter> filters;
-
-    /**
-     * Constructor.
-     *
-     * @param filters The list of filters
-     */
-    public MultiPathFilter(List<PathFilter> filters) {
-      this.filters = filters;
-    }
-
-    /**
-     * True iff all filters accept the given path.
-     *
-     * @param path The path to check
-     * @return Whether the path is accepted
-     */
-    public boolean accept(Path path) {
-      for (PathFilter filter : filters) {
-        if (!filter.accept(path)) {
-          return false;
-        }
-      }
-      return true;
-    }
-  }
-
-  /**
-   * Common method for listing vertex/edge input directories.
-   *
-   * @param job The job
-   * @param dirs list of vertex/edge input paths
-   * @return Array of FileStatus objects
-   * @throws IOException
-   */
-  private List<FileStatus> listStatus(JobContext job, Path[] dirs)
-    throws IOException {
-    List<FileStatus> result = new ArrayList<FileStatus>();
-    if (dirs.length == 0) {
-      throw new IOException("No input paths specified in job");
-    }
-
-/*if[HADOOP_NON_SECURE]
-else[HADOOP_NON_SECURE]
-    // get tokens for all the required FileSystems..
-    TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
-        job.getConfiguration());
-end[HADOOP_NON_SECURE]*/
-
-    List<IOException> errors = new ArrayList<IOException>();
-
-    // creates a MultiPathFilter with the HIDDEN_FILE_FILTER and the
-    // user provided one (if any).
-    List<PathFilter> filters = new ArrayList<PathFilter>();
-    filters.add(HIDDEN_FILE_FILTER);
-    PathFilter jobFilter = getInputPathFilter(job);
-    if (jobFilter != null) {
-      filters.add(jobFilter);
-    }
-    PathFilter inputFilter = new MultiPathFilter(filters);
-
-    for (Path p : dirs) {
-      FileSystem fs = p.getFileSystem(job.getConfiguration());
-      FileStatus[] matches = fs.globStatus(p, inputFilter);
-      if (matches == null) {
-        errors.add(new IOException("Input path does not exist: " + p));
-      } else if (matches.length == 0) {
-        errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
-      } else {
-        for (FileStatus globStat: matches) {
-          if (globStat.isDir()) {
-            Collections.addAll(result, fs.listStatus(globStat.getPath()));
-          } else {
-            result.add(globStat);
-          }
-        }
-      }
-    }
-
-    if (!errors.isEmpty()) {
-      throw new InvalidInputException(errors);
-    }
-    LOG.info("Total input paths to process : " + result.size());
-    return result;
-  }
-
-  /**
-   * List vertex input directories.
-   *
-   * @param job the job to list vertex input paths for
-   * @return array of FileStatus objects
-   * @throws IOException if zero items.
-   */
-  protected List<FileStatus> listVertexStatus(JobContext job)
-    throws IOException {
-    return listStatus(job, getVertexInputPaths(job));
-  }
-
-  /**
-   * List edge input directories.
-   *
-   * @param job the job to list edge input paths for
-   * @return array of FileStatus objects
-   * @throws IOException if zero items.
-   */
-  protected List<FileStatus> listEdgeStatus(JobContext job)
-    throws IOException {
-    return listStatus(job, getEdgeInputPaths(job));
-  }
-
-  /**
-   * Common method for generating the list of vertex/edge input splits.
-   *
-   * @param job The job
-   * @param files Array of FileStatus objects for vertex/edge input files
-   * @return The list of vertex/edge input splits
-   * @throws IOException
-   */
-  private List<InputSplit> getSplits(JobContext job, List<FileStatus> files)
-    throws IOException {
-    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
-    long maxSize = getMaxSplitSize(job);
-
-    // generate splits
-    List<InputSplit> splits = new ArrayList<InputSplit>();
-
-    for (FileStatus file: files) {
-      Path path = file.getPath();
-      FileSystem fs = path.getFileSystem(job.getConfiguration());
-      long length = file.getLen();
-      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
-      if ((length != 0) && isSplitable(job, path)) {
-        long blockSize = file.getBlockSize();
-        long splitSize = computeSplitSize(blockSize, minSize, maxSize);
-
-        long bytesRemaining = length;
-        while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
-          int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
-          splits.add(new FileSplit(path, length - bytesRemaining, splitSize,
-              blkLocations[blkIndex].getHosts()));
-          bytesRemaining -= splitSize;
-        }
-
-        if (bytesRemaining != 0) {
-          splits.add(new FileSplit(path, length - bytesRemaining,
-              bytesRemaining,
-              blkLocations[blkLocations.length - 1].getHosts()));
-        }
-      } else if (length != 0) {
-        splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
-      } else {
-        //Create empty hosts array for zero length files
-        splits.add(new FileSplit(path, 0, length, new String[0]));
-      }
-    }
-    return splits;
-  }
-
-  /**
-   * Generate the list of vertex input splits.
-   *
-   * @param job The job
-   * @return The list of vertex input splits
-   * @throws IOException
-   */
-  public List<InputSplit> getVertexSplits(JobContext job) throws IOException {
-    List<FileStatus> files = listVertexStatus(job);
-    List<InputSplit> splits = getSplits(job, files);
-    // Save the number of input files in the job-conf
-    job.getConfiguration().setLong(NUM_VERTEX_INPUT_FILES, files.size());
-    LOG.debug("Total # of vertex splits: " + splits.size());
-    return splits;
-  }
-
-  /**
-   * Generate the list of edge input splits.
-   *
-   * @param job The job
-   * @return The list of edge input splits
-   * @throws IOException
-   */
-  public List<InputSplit> getEdgeSplits(JobContext job) throws IOException {
-    List<FileStatus> files = listEdgeStatus(job);
-    List<InputSplit> splits = getSplits(job, files);
-    // Save the number of input files in the job-conf
-    job.getConfiguration().setLong(NUM_EDGE_INPUT_FILES, files.size());
-    LOG.debug("Total # of edge splits: " + splits.size());
-    return splits;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
new file mode 100644
index 0000000..ca725ca
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Common interface for {@link VertexInputFormat} and {@link EdgeInputFormat}.
+ */
+public interface GiraphInputFormat {
+  /**
+   * Get the list of input splits for the format.
+   *
+   * @param context The job context
+   * @param numWorkers Number of workers
+   * @return The list of input splits
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  List<InputSplit> getSplits(JobContext context, int numWorkers)
+    throws IOException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/GiraphTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/GiraphTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/GiraphTextInputFormat.java
deleted file mode 100644
index 113b2ea..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/GiraphTextInputFormat.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
-
-/**
- * Provides functionality similar to {@link org.apache.hadoop
- * .mapreduce.lib.input.TextInputFormat}, but allows for different data
- * sources (vertex and edge data).
- */
-public class GiraphTextInputFormat
-    extends GiraphFileInputFormat<LongWritable, Text> {
-  @Override
-  public RecordReader<LongWritable, Text>
-  createRecordReader(InputSplit split, TaskAttemptContext context) {
-    return new LineRecordReader();
-  }
-
-  @Override
-  protected boolean isSplitable(JobContext context, Path file) {
-    CompressionCodec codec =
-        new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
-    return codec == null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java
deleted file mode 100644
index 53dd112..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-
-/**
- * Write out Vertices' IDs and values, but not their edges nor edges' values.
- * This is a useful output format when the final value of the vertex is
- * all that's needed. The boolean configuration parameter reverse.id.and.value
- * allows reversing the output of id and value.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public class IdWithValueTextOutputFormat<I extends WritableComparable,
-    V extends Writable, E extends Writable>
-    extends TextVertexOutputFormat<I, V, E> {
-
-  /** Specify the output delimiter */
-  public static final String LINE_TOKENIZE_VALUE = "output.delimiter";
-  /** Default output delimiter */
-  public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
-  /** Reverse id and value order? */
-  public static final String REVERSE_ID_AND_VALUE = "reverse.id.and.value";
-  /** Default is to not reverse id and value order. */
-  public static final boolean REVERSE_ID_AND_VALUE_DEFAULT = false;
-
-  @Override
-  public TextVertexWriter createVertexWriter(TaskAttemptContext context) {
-    return new IdWithValueVertexWriter();
-  }
-
-  /**
-   * Vertex writer used with {@link IdWithValueTextOutputFormat}.
-   */
-  protected class IdWithValueVertexWriter extends TextVertexWriterToEachLine {
-    /** Saved delimiter */
-    private String delimiter;
-    /** Cached reserve option */
-    private boolean reverseOutput;
-
-    @Override
-    public void initialize(TaskAttemptContext context) throws IOException,
-        InterruptedException {
-      super.initialize(context);
-      Configuration conf = context.getConfiguration();
-      delimiter = conf
-          .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
-      reverseOutput = conf
-          .getBoolean(REVERSE_ID_AND_VALUE, REVERSE_ID_AND_VALUE_DEFAULT);
-    }
-
-    @Override
-    protected Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
-      throws IOException {
-      String first;
-      String second;
-      if (reverseOutput) {
-        first = vertex.getValue().toString();
-        second = vertex.getId().toString();
-      } else {
-        first = vertex.getId().toString();
-        second = vertex.getValue().toString();
-      }
-      Text line = new Text(first + delimiter + second);
-      return line;
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java
deleted file mode 100644
index 9aa21c7..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import com.google.common.collect.Lists;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.regex.Pattern;
-
-/**
- * Simple text-based {@link org.apache.giraph.graph.VertexInputFormat} for
- * unweighted graphs with int ids.
- *
- * Each line consists of: vertex neighbor1 neighbor2 ...
- */
-public class IntIntNullIntTextInputFormat extends
-    TextVertexInputFormat<IntWritable, IntWritable, NullWritable,
-    IntWritable> {
-  /** Separator of the vertex and neighbors */
-  private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
-
-  @Override
-  public TextVertexReader createVertexReader(InputSplit split,
-      TaskAttemptContext context)
-    throws IOException {
-    return new IntIntNullIntVertexReader();
-  }
-
-  /**
-   * Vertex reader associated with {@link IntIntNullIntTextInputFormat}.
-   */
-  public class IntIntNullIntVertexReader extends
-    TextVertexReaderFromEachLineProcessed<String[]> {
-    /**
-     * Cached vertex id for the current line
-     */
-    private IntWritable id;
-
-    @Override
-    protected String[] preprocessLine(Text line) throws IOException {
-      String[] tokens = SEPARATOR.split(line.toString());
-      id = new IntWritable(Integer.parseInt(tokens[0]));
-      return tokens;
-    }
-
-    @Override
-    protected IntWritable getId(String[] tokens) throws IOException {
-      return id;
-    }
-
-    @Override
-    protected IntWritable getValue(String[] tokens) throws IOException {
-      return id;
-    }
-
-    @Override
-    protected Iterable<Edge<IntWritable, NullWritable>> getEdges(
-        String[] tokens) throws IOException {
-      List<Edge<IntWritable, NullWritable>> edges =
-          Lists.newArrayListWithCapacity(tokens.length - 1);
-      for (int n = 1; n < tokens.length; n++) {
-        edges.add(new Edge<IntWritable, NullWritable>(
-            new IntWritable(Integer.parseInt(tokens[n])),
-            NullWritable.get()));
-      }
-      return edges;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/IntIntTextVertexValueInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/IntIntTextVertexValueInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/IntIntTextVertexValueInputFormat.java
deleted file mode 100644
index 016e74c..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/IntIntTextVertexValueInputFormat.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-import org.apache.giraph.utils.IntPair;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-import java.util.regex.Pattern;
-
-/**
- * Simple text-based {@link org.apache.giraph.graph.VertexValueInputFormat}
- * for integer ids and values.
- *
- * Each line consists of: id, value
- *
- * @param <E> Edge value
- * @param <M> Message data
- */
-public class IntIntTextVertexValueInputFormat<E extends Writable,
-    M extends Writable> extends
-    TextVertexValueInputFormat<IntWritable, IntWritable, E, M> {
-  /** Separator for id and value */
-  private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
-
-  @Override
-  public TextVertexValueReader createVertexValueReader(
-      InputSplit split, TaskAttemptContext context) throws IOException {
-    return new IntIntTextVertexValueReader();
-  }
-
-  /**
-   * {@link org.apache.giraph.graph.VertexValueReader} associated with
-   * {@link IntIntTextVertexValueInputFormat}.
-   */
-  public class IntIntTextVertexValueReader extends
-      TextVertexValueReaderFromEachLineProcessed<IntPair> {
-
-    @Override
-    protected IntPair preprocessLine(Text line) throws IOException {
-      String[] tokens = SEPARATOR.split(line.toString());
-      return new IntPair(Integer.valueOf(tokens[0]),
-          Integer.valueOf(tokens[1]));
-    }
-
-    @Override
-    protected IntWritable getId(IntPair data) throws IOException {
-      return new IntWritable(data.getFirst());
-    }
-
-    @Override
-    protected IntWritable getValue(IntPair data) throws IOException {
-      return new IntWritable(data.getSecond());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java
deleted file mode 100644
index 4d98657..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.io;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import com.google.common.collect.ImmutableList;
-
-import java.io.IOException;
-
-/**
- * Simple text-based {@link org.apache.giraph.graph.VertexInputFormat} for
- * unweighted graphs without edges or values, just vertices with ids.
- *
- * Each line is just simply the vertex id.
- */
-public class IntNullNullNullTextInputFormat extends TextVertexInputFormat<
-    IntWritable, NullWritable, NullWritable, NullWritable> {
-  @Override
-  public TextVertexReader createVertexReader(
-      InputSplit split, TaskAttemptContext context) throws IOException {
-    return new IntNullNullNullVertexReader();
-  }
-
-  /**
-   * Reader for this InputFormat.
-   */
-  public class IntNullNullNullVertexReader extends
-      TextVertexReaderFromEachLineProcessed<String> {
-    /** Cached vertex id */
-    private IntWritable id;
-
-    @Override
-    protected String preprocessLine(Text line) throws IOException {
-      id = new IntWritable(Integer.parseInt(line.toString()));
-      return line.toString();
-    }
-
-    @Override
-    protected IntWritable getId(String line) throws IOException {
-      return id;
-    }
-
-    @Override
-    protected NullWritable getValue(String line) throws IOException {
-      return NullWritable.get();
-    }
-
-    @Override
-    protected Iterable<Edge<IntWritable, NullWritable>> getEdges(String line)
-      throws IOException {
-      return ImmutableList.of();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java
deleted file mode 100644
index ed13b45..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-import org.apache.giraph.utils.IntPair;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-import java.util.regex.Pattern;
-
-/**
- * Simple text-based {@link org.apache.giraph.graph.EdgeInputFormat} for
- * unweighted graphs with int ids.
- *
- * Each line consists of: source_vertex, target_vertex
- */
-public class IntNullTextEdgeInputFormat extends
-    TextEdgeInputFormat<IntWritable, NullWritable> {
-  /** Splitter for endpoints */
-  private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
-
-  @Override
-  public TextEdgeReader createEdgeReader(
-      InputSplit split, TaskAttemptContext context) throws IOException {
-    return new IntNullTextEdgeReader();
-  }
-
-  /**
-   * {@link org.apache.giraph.graph.EdgeReader} associated with
-   * {@link IntNullTextEdgeInputFormat}.
-   */
-  public class IntNullTextEdgeReader extends
-      TextEdgeReaderFromEachLineProcessed<IntPair> {
-    @Override
-    protected IntPair preprocessLine(Text line) throws IOException {
-      String[] tokens = SEPARATOR.split(line.toString());
-      return new IntPair(Integer.valueOf(tokens[0]),
-          Integer.valueOf(tokens[1]));
-    }
-
-    @Override
-    protected IntWritable getSourceVertexId(IntPair endpoints)
-      throws IOException {
-      return new IntWritable(endpoints.getFirst());
-    }
-
-    @Override
-    protected IntWritable getTargetVertexId(IntPair endpoints)
-      throws IOException {
-      return new IntWritable(endpoints.getSecond());
-    }
-
-    @Override
-    protected NullWritable getValue(IntPair endpoints) throws IOException {
-      return NullWritable.get();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexFormat.java
deleted file mode 100644
index c2c1cbb..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexFormat.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-/**
- * Keeps the vertex keys for the input/output vertex format
- */
-public class JsonBase64VertexFormat {
-  /** Vertex id key */
-  public static final String VERTEX_ID_KEY = "vertexId";
-  /** Vertex value key*/
-  public static final String VERTEX_VALUE_KEY = "vertexValue";
-  /** Edge value array key (all the edges are stored here) */
-  public static final String EDGE_ARRAY_KEY = "edgeArray";
-
-  /**
-   * Don't construct.
-   */
-  private JsonBase64VertexFormat() { }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java
deleted file mode 100644
index cc5872c..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import com.google.common.collect.Lists;
-import net.iharder.Base64;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Simple way to represent the structure of the graph with a JSON object.
- * The actual vertex ids, values, edges are stored by the
- * Writable serialized bytes that are Byte64 encoded.
- * Works with {@link JsonBase64VertexOutputFormat}
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public class JsonBase64VertexInputFormat<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends TextVertexInputFormat<I, V, E, M> {
-
-  @Override
-  public TextVertexReader createVertexReader(InputSplit split,
-      TaskAttemptContext context) {
-    return new JsonBase64VertexReader();
-  }
-
-  /**
-   * Simple reader that supports {@link JsonBase64VertexInputFormat}
-   */
-  protected class JsonBase64VertexReader extends
-    TextVertexReaderFromEachLineProcessed<JSONObject> {
-
-
-    @Override
-    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
-      throws IOException, InterruptedException {
-      super.initialize(inputSplit, context);
-    }
-
-    @Override
-    protected JSONObject preprocessLine(Text line) {
-      try {
-        return new JSONObject(line.toString());
-      } catch (JSONException e) {
-        throw new IllegalArgumentException(
-          "next: Failed to get the vertex", e);
-      }
-    }
-
-    @Override
-    protected I getId(JSONObject vertexObject) throws IOException {
-      try {
-        byte[] decodedWritable = Base64.decode(
-            vertexObject.getString(JsonBase64VertexFormat.VERTEX_ID_KEY));
-        DataInput input = new DataInputStream(
-            new ByteArrayInputStream(decodedWritable));
-        I vertexId = getConf().createVertexId();
-        vertexId.readFields(input);
-        return vertexId;
-      } catch (JSONException e) {
-        throw new IllegalArgumentException(
-          "next: Failed to get vertex id", e);
-      }
-    }
-
-    @Override
-    protected V getValue(JSONObject vertexObject) throws IOException {
-      try {
-        byte[] decodedWritable = Base64.decode(
-            vertexObject.getString(JsonBase64VertexFormat.VERTEX_VALUE_KEY));
-        DataInputStream input = new DataInputStream(
-            new ByteArrayInputStream(decodedWritable));
-        V vertexValue = getConf().createVertexValue();
-        vertexValue.readFields(input);
-        return vertexValue;
-      } catch (JSONException e) {
-        throw new IllegalArgumentException(
-          "next: Failed to get vertex value", e);
-      }
-    }
-
-    @Override
-    protected Iterable<Edge<I, E>> getEdges(JSONObject vertexObject) throws
-    IOException {
-      JSONArray edgeArray = null;
-      try {
-        edgeArray = vertexObject.getJSONArray(
-          JsonBase64VertexFormat.EDGE_ARRAY_KEY);
-      } catch (JSONException e) {
-        throw new IllegalArgumentException(
-          "next: Failed to get edge array", e);
-      }
-      byte[] decodedWritable;
-      List<Edge<I, E>> edges = Lists.newArrayListWithCapacity(
-          edgeArray.length());
-      for (int i = 0; i < edgeArray.length(); ++i) {
-        try {
-          decodedWritable = Base64.decode(edgeArray.getString(i));
-        } catch (JSONException e) {
-          throw new IllegalArgumentException(
-            "next: Failed to get edge value", e);
-        }
-        DataInputStream input = new DataInputStream(
-            new ByteArrayInputStream(decodedWritable));
-        I targetVertexId = getConf().createVertexId();
-        targetVertexId.readFields(input);
-        E edgeValue = getConf().createEdgeValue();
-        edgeValue.readFields(input);
-        edges.add(new Edge<I, E>(targetVertexId, edgeValue));
-      }
-      return edges;
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java
deleted file mode 100644
index eb3c9ac..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import net.iharder.Base64;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-/**
- * Simple way to represent the structure of the graph with a JSON object.
- * The actual vertex ids, values, edges are stored by the
- * Writable serialized bytes that are Byte64 encoded.
- * Works with {@link JsonBase64VertexInputFormat}
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public class JsonBase64VertexOutputFormat<I extends WritableComparable,
-    V extends Writable, E extends Writable> extends
-    TextVertexOutputFormat<I, V, E> {
-
-  @Override
-  public TextVertexWriter createVertexWriter(TaskAttemptContext context) {
-    return new JsonBase64VertexWriter();
-  }
-
-  /**
-   * Simple writer that supports {@link JsonBase64VertexOutputFormat}
-   */
-  protected class JsonBase64VertexWriter extends TextVertexWriterToEachLine {
-
-    @Override
-    protected Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
-      throws IOException {
-      ByteArrayOutputStream outputStream =
-          new ByteArrayOutputStream();
-      DataOutput output = new DataOutputStream(outputStream);
-      JSONObject vertexObject = new JSONObject();
-      vertex.getId().write(output);
-      try {
-        vertexObject.put(
-          JsonBase64VertexFormat.VERTEX_ID_KEY,
-          Base64.encodeBytes(outputStream.toByteArray()));
-      } catch (JSONException e) {
-        throw new IllegalStateException(
-            "writerVertex: Failed to insert vertex id", e);
-      }
-      outputStream.reset();
-      vertex.getValue().write(output);
-      try {
-        vertexObject.put(
-          JsonBase64VertexFormat.VERTEX_VALUE_KEY,
-          Base64.encodeBytes(outputStream.toByteArray()));
-      } catch (JSONException e) {
-        throw new IllegalStateException(
-            "writerVertex: Failed to insert vertex value", e);
-      }
-      JSONArray edgeArray = new JSONArray();
-      for (Edge<I, E> edge : vertex.getEdges()) {
-        outputStream.reset();
-        edge.getTargetVertexId().write(output);
-        edge.getValue().write(output);
-        edgeArray.put(Base64.encodeBytes(outputStream.toByteArray()));
-      }
-      try {
-        vertexObject.put(
-          JsonBase64VertexFormat.EDGE_ARRAY_KEY,
-          edgeArray);
-      } catch (JSONException e) {
-        throw new IllegalStateException(
-            "writerVertex: Failed to insert edge array", e);
-      }
-      return new Text(vertexObject.toString());
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java
deleted file mode 100644
index c01d442..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.io;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.json.JSONArray;
-import org.json.JSONException;
-
-import com.google.common.collect.Lists;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
-  * VertexInputFormat that features <code>long</code> vertex ID's,
-  * <code>double</code> vertex values and <code>float</code>
-  * out-edge weights, and <code>double</code> message types,
-  *  specified in JSON format.
-  */
-public class JsonLongDoubleFloatDoubleVertexInputFormat extends
-  TextVertexInputFormat<LongWritable, DoubleWritable,
-  FloatWritable, DoubleWritable> {
-
-  @Override
-  public TextVertexReader createVertexReader(InputSplit split,
-      TaskAttemptContext context) {
-    return new JsonLongDoubleFloatDoubleVertexReader();
-  }
-
- /**
-  * VertexReader that features <code>double</code> vertex
-  * values and <code>float</code> out-edge weights. The
-  * files should be in the following JSON format:
-  * JSONArray(<vertex id>, <vertex value>,
-  *   JSONArray(JSONArray(<dest vertex id>, <edge value>), ...))
-  * Here is an example with vertex id 1, vertex value 4.3, and two edges.
-  * First edge has a destination vertex 2, edge value 2.1.
-  * Second edge has a destination vertex 3, edge value 0.7.
-  * [1,4.3,[[2,2.1],[3,0.7]]]
-  */
-  class JsonLongDoubleFloatDoubleVertexReader extends
-    TextVertexReaderFromEachLineProcessedHandlingExceptions<JSONArray,
-    JSONException> {
-
-    @Override
-    protected JSONArray preprocessLine(Text line) throws JSONException {
-      return new JSONArray(line.toString());
-    }
-
-    @Override
-    protected LongWritable getId(JSONArray jsonVertex) throws JSONException,
-              IOException {
-      return new LongWritable(jsonVertex.getLong(0));
-    }
-
-    @Override
-    protected DoubleWritable getValue(JSONArray jsonVertex) throws
-      JSONException, IOException {
-      return new DoubleWritable(jsonVertex.getDouble(1));
-    }
-
-    @Override
-    protected Iterable<Edge<LongWritable, FloatWritable>> getEdges(
-        JSONArray jsonVertex) throws JSONException, IOException {
-      JSONArray jsonEdgeArray = jsonVertex.getJSONArray(2);
-      List<Edge<LongWritable, FloatWritable>> edges =
-          Lists.newArrayListWithCapacity(jsonEdgeArray.length());
-      for (int i = 0; i < jsonEdgeArray.length(); ++i) {
-        JSONArray jsonEdge = jsonEdgeArray.getJSONArray(i);
-        edges.add(new Edge<LongWritable, FloatWritable>(
-            new LongWritable(jsonEdge.getLong(0)),
-            new FloatWritable((float) jsonEdge.getDouble(1))));
-      }
-      return edges;
-    }
-
-    @Override
-    protected Vertex<LongWritable, DoubleWritable, FloatWritable,
-              DoubleWritable> handleException(Text line, JSONArray jsonVertex,
-                  JSONException e) {
-      throw new IllegalArgumentException(
-          "Couldn't get vertex from line " + line, e);
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java
deleted file mode 100644
index 2ab7032..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.json.JSONArray;
-import org.json.JSONException;
-
-import java.io.IOException;
-
-/**
- * VertexOutputFormat that supports JSON encoded vertices featuring
- * <code>double</code> values and <code>float</code> out-edge weights
- */
-public class JsonLongDoubleFloatDoubleVertexOutputFormat extends
-  TextVertexOutputFormat<LongWritable, DoubleWritable,
-  FloatWritable> {
-
-  @Override
-  public TextVertexWriter createVertexWriter(
-      TaskAttemptContext context) {
-    return new JsonLongDoubleFloatDoubleVertexWriter();
-  }
-
- /**
-  * VertexWriter that supports vertices with <code>double</code>
-  * values and <code>float</code> out-edge weights.
-  */
-  private class JsonLongDoubleFloatDoubleVertexWriter extends
-    TextVertexWriterToEachLine {
-    @Override
-    public Text convertVertexToLine(
-      Vertex<LongWritable, DoubleWritable,
-        FloatWritable, ?> vertex
-    ) throws IOException {
-      JSONArray jsonVertex = new JSONArray();
-      try {
-        jsonVertex.put(vertex.getId().get());
-        jsonVertex.put(vertex.getValue().get());
-        JSONArray jsonEdgeArray = new JSONArray();
-        for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
-          JSONArray jsonEdge = new JSONArray();
-          jsonEdge.put(edge.getTargetVertexId().get());
-          jsonEdge.put(edge.getValue().get());
-          jsonEdgeArray.put(jsonEdge);
-        }
-        jsonVertex.put(jsonEdgeArray);
-      } catch (JSONException e) {
-        throw new IllegalArgumentException(
-          "writeVertex: Couldn't write vertex " + vertex);
-      }
-      return new Text(jsonVertex.toString());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java
deleted file mode 100644
index acae10b..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.io;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-/**
- * InputFormat for reading graphs stored as (ordered) adjacency lists
- * with the vertex ids longs and the vertex values and edges doubles.
- * For example:
- * 22 0.1 45 0.3 99 0.44
- * to repesent a vertex with id 22, value of 0.1 and edges to nodes 45 and 99,
- * with values of 0.3 and 0.44, respectively.
- *
- * @param <M> Message data
- */
-public class LongDoubleDoubleAdjacencyListVertexInputFormat<M extends Writable>
-    extends AdjacencyListTextVertexInputFormat<LongWritable, DoubleWritable,
-    DoubleWritable, M> {
-
-  @Override
-  public AdjacencyListTextVertexReader createVertexReader(InputSplit split,
-      TaskAttemptContext context) {
-    return new LongDoubleDoubleAdjacencyListVertexReader(null);
-  }
-
-  /**
-   * VertexReader associated with
-   * {@link LongDoubleDoubleAdjacencyListVertexInputFormat}.
-   */
-  protected class LongDoubleDoubleAdjacencyListVertexReader extends
-      AdjacencyListTextVertexReader {
-
-    /**
-     * Constructor with {@link LineSanitizer}.
-     *
-     * @param lineSanitizer the sanitizer to use for reading
-     */
-    public LongDoubleDoubleAdjacencyListVertexReader(LineSanitizer
-        lineSanitizer) {
-      super(lineSanitizer);
-    }
-
-    @Override
-    public LongWritable decodeId(String s) {
-      return new LongWritable(Long.valueOf(s));
-    }
-
-    @Override
-    public DoubleWritable decodeValue(String s) {
-      return new DoubleWritable(Double.valueOf(s));
-    }
-
-    @Override
-    public Edge<LongWritable, DoubleWritable> decodeEdge(
-        String s1,
-        String s2) {
-      return new Edge<LongWritable, DoubleWritable>(
-          new LongWritable(Long.valueOf(s1)),
-          new DoubleWritable(Double.valueOf(s2)));
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/PseudoRandomEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/PseudoRandomEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/PseudoRandomEdgeInputFormat.java
deleted file mode 100644
index 3311691..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/PseudoRandomEdgeInputFormat.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-import com.google.common.collect.Sets;
-import org.apache.giraph.bsp.BspInputSplit;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeInputFormat;
-import org.apache.giraph.graph.EdgeReader;
-import org.apache.giraph.graph.EdgeWithSource;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-
-/**
- * This {@link EdgeInputFormat} generates pseudo-random edges on the fly.
- * As with {@link PseudoRandomVertexInputFormat}, the user specifies the
- * number of vertices and the number of edges per vertex.
- */
-public class PseudoRandomEdgeInputFormat
-    extends EdgeInputFormat<LongWritable, DoubleWritable> {
-  /** Set the number of aggregate vertices. */
-  public static final String AGGREGATE_VERTICES =
-      "pseudoRandomEdgeInputFormat.aggregateVertices";
-  /** Set the number of edges per vertex (pseudo-random destination). */
-  public static final String EDGES_PER_VERTEX =
-      "pseudoRandomEdgeInputFormat.edgesPerVertex";
-
-  @Override
-  public final List<InputSplit> getSplits(final JobContext context,
-                                          final int numWorkers)
-    throws IOException, InterruptedException {
-    // This is meaningless, the PseudoRandomEdgeReader will generate
-    // all the test data
-    List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
-    for (int i = 0; i < numWorkers; ++i) {
-      inputSplitList.add(new BspInputSplit(i, numWorkers));
-    }
-    return inputSplitList;
-  }
-
-  @Override
-  public EdgeReader<LongWritable, DoubleWritable> createEdgeReader(
-      InputSplit split, TaskAttemptContext context) throws IOException {
-    return new PseudoRandomEdgeReader();
-  }
-
-  /**
-   * {@link EdgeReader} that generates pseudo-random edges.
-   */
-  private static class PseudoRandomEdgeReader
-      implements EdgeReader<LongWritable, DoubleWritable> {
-    /** Logger. */
-    private static final Logger LOG =
-        Logger.getLogger(PseudoRandomEdgeReader.class);
-    /** Starting vertex id. */
-    private long startingVertexId = -1;
-    /** Vertices read so far. */
-    private long verticesRead = 0;
-    /** Total vertices to read (on this split alone). */
-    private long totalSplitVertices = -1;
-    /** Current vertex id. */
-    private LongWritable currentVertexId = new LongWritable(-1);
-    /** Edges read for the current vertex. */
-    private int currentVertexEdgesRead = 0;
-    /** Target vertices of edges for current vertex. */
-    private Set<LongWritable> currentVertexDestVertices = Sets.newHashSet();
-    /** Random number generator for the current vertex (for consistency
-     * across runs on different numbers of workers). */
-    private Random random = new Random();
-    /** Aggregate vertices (all input splits). */
-    private long aggregateVertices = -1;
-    /** Edges per vertex. */
-    private long edgesPerVertex = -1;
-    /** BspInputSplit (used only for index). */
-    private BspInputSplit bspInputSplit;
-    /** Saved configuration */
-    private ImmutableClassesGiraphConfiguration configuration;
-
-    @Override
-    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
-      throws IOException, InterruptedException {
-      configuration = new ImmutableClassesGiraphConfiguration(
-          context.getConfiguration());
-      aggregateVertices =
-          configuration.getLong(
-              PseudoRandomEdgeInputFormat.AGGREGATE_VERTICES, 0);
-      if (aggregateVertices <= 0) {
-        throw new IllegalArgumentException(
-            PseudoRandomEdgeInputFormat.AGGREGATE_VERTICES + " <= 0");
-      }
-      if (inputSplit instanceof BspInputSplit) {
-        bspInputSplit = (BspInputSplit) inputSplit;
-        long extraVertices =
-            aggregateVertices % bspInputSplit.getNumSplits();
-        totalSplitVertices =
-            aggregateVertices / bspInputSplit.getNumSplits();
-        if (bspInputSplit.getSplitIndex() < extraVertices) {
-          ++totalSplitVertices;
-        }
-        startingVertexId = (bspInputSplit.getSplitIndex() *
-            (aggregateVertices / bspInputSplit.getNumSplits())) +
-            Math.min(bspInputSplit.getSplitIndex(),
-                extraVertices);
-      } else {
-        throw new IllegalArgumentException(
-            "initialize: Got " + inputSplit.getClass() +
-                " instead of " + BspInputSplit.class);
-      }
-      edgesPerVertex = configuration.getLong(
-          PseudoRandomEdgeInputFormat.EDGES_PER_VERTEX, 0);
-      if (edgesPerVertex <= 0) {
-        throw new IllegalArgumentException(
-            PseudoRandomEdgeInputFormat.EDGES_PER_VERTEX + " <= 0");
-      }
-    }
-
-    @Override
-    public boolean nextEdge() throws IOException, InterruptedException {
-      return totalSplitVertices > verticesRead + 1 ||
-          (totalSplitVertices == verticesRead + 1 &&
-              edgesPerVertex > currentVertexEdgesRead);
-    }
-
-    @Override
-    public EdgeWithSource<LongWritable, DoubleWritable> getCurrentEdge()
-      throws IOException, InterruptedException {
-      if (currentVertexEdgesRead == edgesPerVertex) {
-        ++verticesRead;
-        currentVertexId = new LongWritable(-1);
-      }
-
-      if (currentVertexId.get() == -1) {
-        currentVertexId.set(startingVertexId + verticesRead);
-        currentVertexEdgesRead = 0;
-        // Seed on the vertex id to keep the vertex data the same when
-        // on different number of workers, but other parameters are the
-        // same.
-        random.setSeed(currentVertexId.get());
-        currentVertexDestVertices.clear();
-      }
-
-      LongWritable destVertexId;
-      do {
-        destVertexId =
-            new LongWritable(Math.abs(random.nextLong()) %
-                aggregateVertices);
-      } while (currentVertexDestVertices.contains(destVertexId));
-      ++currentVertexEdgesRead;
-      currentVertexDestVertices.add(destVertexId);
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("getCurrentEdge: Return edge (" + currentVertexId + ", " +
-            "" + destVertexId + ")");
-      }
-      return new EdgeWithSource<LongWritable, DoubleWritable>(
-          currentVertexId,
-          new Edge<LongWritable, DoubleWritable>(
-              destVertexId,
-              new DoubleWritable(random.nextDouble())));
-    }
-
-    @Override
-    public void close() throws IOException { }
-
-    @Override
-    public float getProgress() throws IOException, InterruptedException {
-      return (verticesRead * edgesPerVertex + currentVertexEdgesRead) *
-          100.0f / (totalSplitVertices * edgesPerVertex);
-    }
-  }
-}