You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/01/04 21:52:39 UTC
[13/23] GIRAPH-409: Refactor / cleanups (nitay)
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java
deleted file mode 100644
index 3b047f3..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.io;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import com.google.common.collect.Lists;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * VertexReader that readers lines of text with vertices encoded as adjacency
- * lists and converts each token to the correct type. For example, a graph
- * with vertices as integers and values as doubles could be encoded as:
- * 1 0.1 2 0.2 3 0.3
- * to represent a vertex named 1, with 0.1 as its value and two edges, to
- * vertices 2 and 3, with edge values of 0.2 and 0.3, respectively.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public abstract class AdjacencyListTextVertexInputFormat<I extends
- WritableComparable, V extends Writable, E extends Writable, M extends
- Writable> extends TextVertexInputFormat<I, V, E, M> {
- /** Delimiter for split */
- public static final String LINE_TOKENIZE_VALUE = "adj.list.input.delimiter";
- /** Default delimiter for split */
- public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
-
- /**
- * Utility for doing any cleaning of each line before it is tokenized.
- */
- public interface LineSanitizer {
- /**
- * Clean string s before attempting to tokenize it.
- *
- * @param s String to be cleaned.
- * @return Sanitized string.
- */
- String sanitize(String s);
- }
-
- @Override
- public abstract AdjacencyListTextVertexReader createVertexReader(
- InputSplit split, TaskAttemptContext context);
-
- /**
- * Vertex reader associated with {@link AdjacencyListTextVertexInputFormat}.
- */
- protected abstract class AdjacencyListTextVertexReader extends
- TextVertexReaderFromEachLineProcessed<String[]> {
- /**
- * Cached configuration.
- */
- private Configuration conf;
-
- /** Cached delimiter used for split */
- private String splitValue = null;
-
- /**
- * Sanitizer from constructor.
- */
- private final LineSanitizer sanitizer;
-
-
- /**
- * Constructor without line sanitizer.
- *
- */
- public AdjacencyListTextVertexReader() {
- this(null);
- }
-
- /**
- * Constructor with line sanitizer.
- *
- * @param sanitizer Sanitizer to be used.
- */
- public AdjacencyListTextVertexReader(LineSanitizer sanitizer) {
- this.sanitizer = sanitizer;
- }
-
- @Override
- public void initialize(InputSplit inputSplit, TaskAttemptContext context)
- throws IOException, InterruptedException {
- super.initialize(inputSplit, context);
- conf = context.getConfiguration();
- splitValue = conf.get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
- }
-
- @Override
- protected String[] preprocessLine(Text line) throws IOException {
- String sanitizedLine;
- if (sanitizer != null) {
- sanitizedLine = sanitizer.sanitize(line.toString());
- } else {
- sanitizedLine = line.toString();
- }
- String [] values = sanitizedLine.split(splitValue);
- if ((values.length < 2) || (values.length % 2 != 0)) {
- throw new IllegalArgumentException(
- "Line did not split correctly: " + line);
- }
- return values;
- }
-
- @Override
- protected I getId(String[] values) throws IOException {
- return decodeId(values[0]);
- }
-
- /**
- * Decode the id for this line into an instance of its correct type.
- *
- * @param s Id of vertex from line
- * @return Vertex id
- */
- public abstract I decodeId(String s);
-
- @Override
- protected V getValue(String[] values) throws IOException {
- return decodeValue(values[1]);
- }
-
-
- /**
- * Decode the value for this line into an instance of its correct type.
- *
- * @param s Value from line
- * @return Vertex value
- */
- public abstract V decodeValue(String s);
-
- @Override
- protected Iterable<Edge<I, E>> getEdges(String[] values) throws
- IOException {
- int i = 2;
- List<Edge<I, E>> edges = Lists.newLinkedList();
- while (i < values.length) {
- edges.add(decodeEdge(values[i], values[i + 1]));
- i += 2;
- }
- return edges;
- }
-
- /**
- * Decode an edge from the line into an instance of a correctly typed Edge
- *
- * @param id The edge's id from the line
- * @param value The edge's value from the line
- * @return Edge with given target id and value
- */
- public abstract Edge<I, E> decodeEdge(String id, String value);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java
deleted file mode 100644
index 77abb85..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.io;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-
-/**
- * OutputFormat to write out the graph nodes as text, value-separated (by
- * tabs, by default). With the default delimiter, a vertex is written out as:
- *
- * <VertexId><tab><Vertex Value><tab>[<EdgeId><tab><EdgeValue>]+
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public class AdjacencyListTextVertexOutputFormat<I extends WritableComparable,
- V extends Writable, E extends Writable>
- extends TextVertexOutputFormat<I, V, E> {
-
- /** Split delimiter */
- public static final String LINE_TOKENIZE_VALUE = "output.delimiter";
- /** Default split delimiter */
- public static final String LINE_TOKENIZE_VALUE_DEFAULT =
- AdjacencyListTextVertexInputFormat.LINE_TOKENIZE_VALUE_DEFAULT;
-
- @Override
- public AdjacencyListTextVertexWriter createVertexWriter(
- TaskAttemptContext context) {
- return new AdjacencyListTextVertexWriter();
- }
-
- /**
- * Vertex writer associated with {@link AdjacencyListTextVertexOutputFormat}.
- */
- protected class AdjacencyListTextVertexWriter extends
- TextVertexWriterToEachLine {
- /** Cached split delimeter */
- private String delimiter;
-
- @Override
- public void initialize(TaskAttemptContext context) throws IOException,
- InterruptedException {
- super.initialize(context);
- delimiter = context.getConfiguration()
- .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
- }
-
- @Override
- public Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
- throws IOException {
- StringBuffer sb = new StringBuffer(vertex.getId().toString());
- sb.append(delimiter);
- sb.append(vertex.getValue());
-
- for (Edge<I, E> edge : vertex.getEdges()) {
- sb.append(delimiter).append(edge.getTargetVertexId());
- sb.append(delimiter).append(edge.getValue());
- }
-
- return new Text(sb.toString());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/BasicVertexValueReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/BasicVertexValueReader.java b/giraph-core/src/main/java/org/apache/giraph/io/BasicVertexValueReader.java
new file mode 100644
index 0000000..1ccde39
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/BasicVertexValueReader.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * Abstract base class for VertexValueReader.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public abstract class BasicVertexValueReader<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ implements VertexReader<I, V, E, M> {
+ /**
+ * User-defined method to extract the vertex id.
+ *
+ * @return The vertex id
+ * @throws java.io.IOException
+ * @throws InterruptedException
+ */
+ public abstract I getCurrentVertexId() throws IOException,
+ InterruptedException;
+
+ /**
+ * User-defined method to extract the vertex value.
+ *
+ * @return The vertex value
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract V getCurrentVertexValue() throws IOException,
+ InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java
new file mode 100644
index 0000000..0d5c43f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Input format for reading single edges.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ */
+public abstract class EdgeInputFormat<I extends WritableComparable,
+ E extends Writable> implements GiraphInputFormat {
+ /**
+ * Logically split the vertices for a graph processing application.
+ *
+ * Each {@link InputSplit} is then assigned to a worker for processing.
+ *
+ * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
+ * input files are not physically split into chunks. For e.g. a split could
+ * be <i><input-file-path, start, offset></i> tuple. The InputFormat
+ * also creates the {@link VertexReader} to read the {@link InputSplit}.
+ *
+ * Also, the number of workers is a hint given to the developer to try to
+ * intelligently determine how many splits to create (if this is
+ * adjustable) at runtime.
+ *
+ * @param context Context of the job
+ * @param numWorkers Number of workers used for this job
+ * @return an array of {@link InputSplit}s for the job.
+ */
+ @Override
+ public abstract List<InputSplit> getSplits(
+ JobContext context, int numWorkers) throws IOException,
+ InterruptedException;
+
+ /**
+ * Create an edge reader for a given split. The framework will call
+ * {@link EdgeReader#initialize(InputSplit, TaskAttemptContext)} before
+ * the split is used.
+ *
+ * @param split the split to be read
+ * @param context the information about the task
+ * @return a new record reader
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract EdgeReader<I, E> createEdgeReader(
+ InputSplit split,
+ TaskAttemptContext context) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
new file mode 100644
index 0000000..9642fab
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.graph.EdgeWithSource;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Analogous to {@link RecordReader} for edges. Will read the edges
+ * from an input split.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ */
+@SuppressWarnings("rawtypes")
+public interface EdgeReader<I extends WritableComparable, E extends Writable> {
+ /**
+ * Use the input split and context to setup reading the edges.
+ * Guaranteed to be called prior to any other function.
+ *
+ * @param inputSplit Input split to be used for reading edges.
+ * @param context Context from the task.
+ * @throws java.io.IOException
+ * @throws InterruptedException
+ */
+ void initialize(InputSplit inputSplit, TaskAttemptContext context) throws
+ IOException, InterruptedException;
+
+ /**
+ *
+ * @return false iff there are no more edges
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ boolean nextEdge() throws IOException, InterruptedException;
+
+ /**
+ * Get the current edge.
+ *
+ * @return the current edge which has been read.
+ * nextEdge() should be called first.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ EdgeWithSource<I, E> getCurrentEdge() throws IOException,
+ InterruptedException;
+
+ /**
+ * Close this {@link EdgeReader} to future operations.
+ *
+ * @throws IOException
+ */
+ void close() throws IOException;
+
+ /**
+ * How much of the input has the {@link EdgeReader} consumed i.e.
+ * has been processed by?
+ *
+ * @return Progress from <code>0.0</code> to <code>1.0</code>.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ float getProgress() throws IOException, InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/GeneratedVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/GeneratedVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/GeneratedVertexInputFormat.java
deleted file mode 100644
index ad4f2bf..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/GeneratedVertexInputFormat.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-import org.apache.giraph.bsp.BspInputSplit;
-import org.apache.giraph.graph.VertexInputFormat;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * This VertexInputFormat is meant for testing/debugging. It simply generates
- * some vertex data that can be consumed by test applications.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public abstract class GeneratedVertexInputFormat<
- I extends WritableComparable, V extends Writable, E extends Writable,
- M extends Writable> extends VertexInputFormat<I, V, E, M> {
- @Override
- public List<InputSplit> getSplits(JobContext context, int numWorkers)
- throws IOException, InterruptedException {
- // This is meaningless, the VertexReader will generate all the test
- // data.
- List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
- for (int i = 0; i < numWorkers; ++i) {
- inputSplitList.add(new BspInputSplit(i, numWorkers));
- }
- return inputSplitList;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java
deleted file mode 100644
index 114e75f..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/*if[HADOOP_NON_SECURE]
-else[HADOOP_NON_SECURE]
-import org.apache.hadoop.mapreduce.security.TokenCache;
-end[HADOOP_NON_SECURE]*/
-
-/**
- * Provides functionality similar to {@link FileInputFormat},
- * but allows for different data sources (vertex and edge data).
- *
- * @param <K> Key
- * @param <V> Value
- */
-public abstract class GiraphFileInputFormat<K, V>
- extends FileInputFormat<K, V> {
- /** Vertex input file paths. */
- public static final String VERTEX_INPUT_DIR = "giraph.vertex.input.dir";
- /** Edge input file paths. */
- public static final String EDGE_INPUT_DIR = "giraph.edge.input.dir";
- /** Number of vertex input files. */
- public static final String NUM_VERTEX_INPUT_FILES =
- "giraph.input.vertex.num.files";
- /** Number of edge input files. */
- public static final String NUM_EDGE_INPUT_FILES =
- "giraph.input.edge.num.files";
-
- /** Split slop. */
- private static final double SPLIT_SLOP = 1.1; // 10% slop
-
- /** Filter for hidden files. */
- private static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() {
- public boolean accept(Path p) {
- String name = p.getName();
- return !name.startsWith("_") && !name.startsWith(".");
- }
- };
-
- /** Class logger. */
- private static final Logger LOG =
- Logger.getLogger(GiraphFileInputFormat.class);
-
- /**
- * Add a {@link org.apache.hadoop.fs.Path} to the list of vertex inputs.
- *
- * @param job The {@link org.apache.hadoop.mapreduce.Job} to modify
- * @param path {@link org.apache.hadoop.fs.Path} to be added to the list of
- * vertex inputs
- */
- public static void addVertexInputPath(Job job, Path path) throws IOException {
- Configuration conf = job.getConfiguration();
- path = path.getFileSystem(conf).makeQualified(path);
- String dirStr = StringUtils.escapeString(path.toString());
- String dirs = conf.get(VERTEX_INPUT_DIR);
- conf.set(VERTEX_INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
- }
-
- /**
- * Add a {@link org.apache.hadoop.fs.Path} to the list of edge inputs.
- *
- * @param job The {@link org.apache.hadoop.mapreduce.Job} to modify
- * @param path {@link org.apache.hadoop.fs.Path} to be added to the list of
- * edge inputs
- */
- public static void addEdgeInputPath(Job job, Path path) throws IOException {
- Configuration conf = job.getConfiguration();
- path = path.getFileSystem(conf).makeQualified(path);
- String dirStr = StringUtils.escapeString(path.toString());
- String dirs = conf.get(EDGE_INPUT_DIR);
- conf.set(EDGE_INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
- }
-
- /**
- * Get the list of vertex input {@link Path}s.
- *
- * @param context The job
- * @return The list of input {@link Path}s
- */
- public static Path[] getVertexInputPaths(JobContext context) {
- String dirs = context.getConfiguration().get(VERTEX_INPUT_DIR, "");
- String [] list = StringUtils.split(dirs);
- Path[] result = new Path[list.length];
- for (int i = 0; i < list.length; i++) {
- result[i] = new Path(StringUtils.unEscapeString(list[i]));
- }
- return result;
- }
-
- /**
- * Get the list of edge input {@link Path}s.
- *
- * @param context The job
- * @return The list of input {@link Path}s
- */
- public static Path[] getEdgeInputPaths(JobContext context) {
- String dirs = context.getConfiguration().get(EDGE_INPUT_DIR, "");
- String [] list = StringUtils.split(dirs);
- Path[] result = new Path[list.length];
- for (int i = 0; i < list.length; i++) {
- result[i] = new Path(StringUtils.unEscapeString(list[i]));
- }
- return result;
- }
-
- /**
- * Proxy PathFilter that accepts a path only if all filters given in the
- * constructor do. Used by the listPaths() to apply the built-in
- * HIDDEN_FILE_FILTER together with a user provided one (if any).
- */
- private static class MultiPathFilter implements PathFilter {
- /** List of filters. */
- private List<PathFilter> filters;
-
- /**
- * Constructor.
- *
- * @param filters The list of filters
- */
- public MultiPathFilter(List<PathFilter> filters) {
- this.filters = filters;
- }
-
- /**
- * True iff all filters accept the given path.
- *
- * @param path The path to check
- * @return Whether the path is accepted
- */
- public boolean accept(Path path) {
- for (PathFilter filter : filters) {
- if (!filter.accept(path)) {
- return false;
- }
- }
- return true;
- }
- }
-
- /**
- * Common method for listing vertex/edge input directories.
- *
- * @param job The job
- * @param dirs list of vertex/edge input paths
- * @return Array of FileStatus objects
- * @throws IOException
- */
- private List<FileStatus> listStatus(JobContext job, Path[] dirs)
- throws IOException {
- List<FileStatus> result = new ArrayList<FileStatus>();
- if (dirs.length == 0) {
- throw new IOException("No input paths specified in job");
- }
-
-/*if[HADOOP_NON_SECURE]
-else[HADOOP_NON_SECURE]
- // get tokens for all the required FileSystems..
- TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
- job.getConfiguration());
-end[HADOOP_NON_SECURE]*/
-
- List<IOException> errors = new ArrayList<IOException>();
-
- // creates a MultiPathFilter with the HIDDEN_FILE_FILTER and the
- // user provided one (if any).
- List<PathFilter> filters = new ArrayList<PathFilter>();
- filters.add(HIDDEN_FILE_FILTER);
- PathFilter jobFilter = getInputPathFilter(job);
- if (jobFilter != null) {
- filters.add(jobFilter);
- }
- PathFilter inputFilter = new MultiPathFilter(filters);
-
- for (Path p : dirs) {
- FileSystem fs = p.getFileSystem(job.getConfiguration());
- FileStatus[] matches = fs.globStatus(p, inputFilter);
- if (matches == null) {
- errors.add(new IOException("Input path does not exist: " + p));
- } else if (matches.length == 0) {
- errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
- } else {
- for (FileStatus globStat: matches) {
- if (globStat.isDir()) {
- Collections.addAll(result, fs.listStatus(globStat.getPath()));
- } else {
- result.add(globStat);
- }
- }
- }
- }
-
- if (!errors.isEmpty()) {
- throw new InvalidInputException(errors);
- }
- LOG.info("Total input paths to process : " + result.size());
- return result;
- }
-
- /**
- * List vertex input directories.
- *
- * @param job the job to list vertex input paths for
- * @return array of FileStatus objects
- * @throws IOException if zero items.
- */
- protected List<FileStatus> listVertexStatus(JobContext job)
- throws IOException {
- return listStatus(job, getVertexInputPaths(job));
- }
-
- /**
- * List edge input directories.
- *
- * @param job the job to list edge input paths for
- * @return array of FileStatus objects
- * @throws IOException if zero items.
- */
- protected List<FileStatus> listEdgeStatus(JobContext job)
- throws IOException {
- return listStatus(job, getEdgeInputPaths(job));
- }
-
- /**
- * Common method for generating the list of vertex/edge input splits.
- *
- * @param job The job
- * @param files Array of FileStatus objects for vertex/edge input files
- * @return The list of vertex/edge input splits
- * @throws IOException
- */
- private List<InputSplit> getSplits(JobContext job, List<FileStatus> files)
- throws IOException {
- long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
- long maxSize = getMaxSplitSize(job);
-
- // generate splits
- List<InputSplit> splits = new ArrayList<InputSplit>();
-
- for (FileStatus file: files) {
- Path path = file.getPath();
- FileSystem fs = path.getFileSystem(job.getConfiguration());
- long length = file.getLen();
- BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
- if ((length != 0) && isSplitable(job, path)) {
- long blockSize = file.getBlockSize();
- long splitSize = computeSplitSize(blockSize, minSize, maxSize);
-
- long bytesRemaining = length;
- while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
- int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
- splits.add(new FileSplit(path, length - bytesRemaining, splitSize,
- blkLocations[blkIndex].getHosts()));
- bytesRemaining -= splitSize;
- }
-
- if (bytesRemaining != 0) {
- splits.add(new FileSplit(path, length - bytesRemaining,
- bytesRemaining,
- blkLocations[blkLocations.length - 1].getHosts()));
- }
- } else if (length != 0) {
- splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
- } else {
- //Create empty hosts array for zero length files
- splits.add(new FileSplit(path, 0, length, new String[0]));
- }
- }
- return splits;
- }
-
- /**
- * Generate the list of vertex input splits.
- *
- * @param job The job
- * @return The list of vertex input splits
- * @throws IOException
- */
- public List<InputSplit> getVertexSplits(JobContext job) throws IOException {
- List<FileStatus> files = listVertexStatus(job);
- List<InputSplit> splits = getSplits(job, files);
- // Save the number of input files in the job-conf
- job.getConfiguration().setLong(NUM_VERTEX_INPUT_FILES, files.size());
- LOG.debug("Total # of vertex splits: " + splits.size());
- return splits;
- }
-
- /**
- * Generate the list of edge input splits.
- *
- * @param job The job
- * @return The list of edge input splits
- * @throws IOException
- */
- public List<InputSplit> getEdgeSplits(JobContext job) throws IOException {
- List<FileStatus> files = listEdgeStatus(job);
- List<InputSplit> splits = getSplits(job, files);
- // Save the number of input files in the job-conf
- job.getConfiguration().setLong(NUM_EDGE_INPUT_FILES, files.size());
- LOG.debug("Total # of edge splits: " + splits.size());
- return splits;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
new file mode 100644
index 0000000..ca725ca
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Common interface for {@link VertexInputFormat} and {@link EdgeInputFormat}.
+ */
+public interface GiraphInputFormat {
+ /**
+ * Get the list of input splits for the format.
+ *
+ * @param context The job context
+ * @param numWorkers Number of workers
+ * @return The list of input splits
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ List<InputSplit> getSplits(JobContext context, int numWorkers)
+ throws IOException, InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/GiraphTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/GiraphTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/GiraphTextInputFormat.java
deleted file mode 100644
index 113b2ea..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/GiraphTextInputFormat.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
-
-/**
- * Provides functionality similar to {@link org.apache.hadoop
- * .mapreduce.lib.input.TextInputFormat}, but allows for different data
- * sources (vertex and edge data).
- */
-public class GiraphTextInputFormat
- extends GiraphFileInputFormat<LongWritable, Text> {
- @Override
- public RecordReader<LongWritable, Text>
- createRecordReader(InputSplit split, TaskAttemptContext context) {
- return new LineRecordReader();
- }
-
- @Override
- protected boolean isSplitable(JobContext context, Path file) {
- CompressionCodec codec =
- new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
- return codec == null;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java
deleted file mode 100644
index 53dd112..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-
-/**
- * Write out Vertices' IDs and values, but not their edges nor edges' values.
- * This is a useful output format when the final value of the vertex is
- * all that's needed. The boolean configuration parameter reverse.id.and.value
- * allows reversing the output of id and value.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public class IdWithValueTextOutputFormat<I extends WritableComparable,
- V extends Writable, E extends Writable>
- extends TextVertexOutputFormat<I, V, E> {
-
- /** Specify the output delimiter */
- public static final String LINE_TOKENIZE_VALUE = "output.delimiter";
- /** Default output delimiter */
- public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
- /** Reverse id and value order? */
- public static final String REVERSE_ID_AND_VALUE = "reverse.id.and.value";
- /** Default is to not reverse id and value order. */
- public static final boolean REVERSE_ID_AND_VALUE_DEFAULT = false;
-
- @Override
- public TextVertexWriter createVertexWriter(TaskAttemptContext context) {
- return new IdWithValueVertexWriter();
- }
-
- /**
- * Vertex writer used with {@link IdWithValueTextOutputFormat}.
- */
- protected class IdWithValueVertexWriter extends TextVertexWriterToEachLine {
- /** Saved delimiter */
- private String delimiter;
- /** Cached reserve option */
- private boolean reverseOutput;
-
- @Override
- public void initialize(TaskAttemptContext context) throws IOException,
- InterruptedException {
- super.initialize(context);
- Configuration conf = context.getConfiguration();
- delimiter = conf
- .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
- reverseOutput = conf
- .getBoolean(REVERSE_ID_AND_VALUE, REVERSE_ID_AND_VALUE_DEFAULT);
- }
-
- @Override
- protected Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
- throws IOException {
- String first;
- String second;
- if (reverseOutput) {
- first = vertex.getValue().toString();
- second = vertex.getId().toString();
- } else {
- first = vertex.getId().toString();
- second = vertex.getValue().toString();
- }
- Text line = new Text(first + delimiter + second);
- return line;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java
deleted file mode 100644
index 9aa21c7..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import com.google.common.collect.Lists;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.regex.Pattern;
-
-/**
- * Simple text-based {@link org.apache.giraph.graph.VertexInputFormat} for
- * unweighted graphs with int ids.
- *
- * Each line consists of: vertex neighbor1 neighbor2 ...
- */
-public class IntIntNullIntTextInputFormat extends
- TextVertexInputFormat<IntWritable, IntWritable, NullWritable,
- IntWritable> {
- /** Separator of the vertex and neighbors */
- private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
-
- @Override
- public TextVertexReader createVertexReader(InputSplit split,
- TaskAttemptContext context)
- throws IOException {
- return new IntIntNullIntVertexReader();
- }
-
- /**
- * Vertex reader associated with {@link IntIntNullIntTextInputFormat}.
- */
- public class IntIntNullIntVertexReader extends
- TextVertexReaderFromEachLineProcessed<String[]> {
- /**
- * Cached vertex id for the current line
- */
- private IntWritable id;
-
- @Override
- protected String[] preprocessLine(Text line) throws IOException {
- String[] tokens = SEPARATOR.split(line.toString());
- id = new IntWritable(Integer.parseInt(tokens[0]));
- return tokens;
- }
-
- @Override
- protected IntWritable getId(String[] tokens) throws IOException {
- return id;
- }
-
- @Override
- protected IntWritable getValue(String[] tokens) throws IOException {
- return id;
- }
-
- @Override
- protected Iterable<Edge<IntWritable, NullWritable>> getEdges(
- String[] tokens) throws IOException {
- List<Edge<IntWritable, NullWritable>> edges =
- Lists.newArrayListWithCapacity(tokens.length - 1);
- for (int n = 1; n < tokens.length; n++) {
- edges.add(new Edge<IntWritable, NullWritable>(
- new IntWritable(Integer.parseInt(tokens[n])),
- NullWritable.get()));
- }
- return edges;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/IntIntTextVertexValueInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/IntIntTextVertexValueInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/IntIntTextVertexValueInputFormat.java
deleted file mode 100644
index 016e74c..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/IntIntTextVertexValueInputFormat.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-import org.apache.giraph.utils.IntPair;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-import java.util.regex.Pattern;
-
-/**
- * Simple text-based {@link org.apache.giraph.graph.VertexValueInputFormat}
- * for integer ids and values.
- *
- * Each line consists of: id, value
- *
- * @param <E> Edge value
- * @param <M> Message data
- */
-public class IntIntTextVertexValueInputFormat<E extends Writable,
- M extends Writable> extends
- TextVertexValueInputFormat<IntWritable, IntWritable, E, M> {
- /** Separator for id and value */
- private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
-
- @Override
- public TextVertexValueReader createVertexValueReader(
- InputSplit split, TaskAttemptContext context) throws IOException {
- return new IntIntTextVertexValueReader();
- }
-
- /**
- * {@link org.apache.giraph.graph.VertexValueReader} associated with
- * {@link IntIntTextVertexValueInputFormat}.
- */
- public class IntIntTextVertexValueReader extends
- TextVertexValueReaderFromEachLineProcessed<IntPair> {
-
- @Override
- protected IntPair preprocessLine(Text line) throws IOException {
- String[] tokens = SEPARATOR.split(line.toString());
- return new IntPair(Integer.valueOf(tokens[0]),
- Integer.valueOf(tokens[1]));
- }
-
- @Override
- protected IntWritable getId(IntPair data) throws IOException {
- return new IntWritable(data.getFirst());
- }
-
- @Override
- protected IntWritable getValue(IntPair data) throws IOException {
- return new IntWritable(data.getSecond());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java
deleted file mode 100644
index 4d98657..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.io;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import com.google.common.collect.ImmutableList;
-
-import java.io.IOException;
-
-/**
- * Simple text-based {@link org.apache.giraph.graph.VertexInputFormat} for
- * unweighted graphs without edges or values, just vertices with ids.
- *
- * Each line is just simply the vertex id.
- */
-public class IntNullNullNullTextInputFormat extends TextVertexInputFormat<
- IntWritable, NullWritable, NullWritable, NullWritable> {
- @Override
- public TextVertexReader createVertexReader(
- InputSplit split, TaskAttemptContext context) throws IOException {
- return new IntNullNullNullVertexReader();
- }
-
- /**
- * Reader for this InputFormat.
- */
- public class IntNullNullNullVertexReader extends
- TextVertexReaderFromEachLineProcessed<String> {
- /** Cached vertex id */
- private IntWritable id;
-
- @Override
- protected String preprocessLine(Text line) throws IOException {
- id = new IntWritable(Integer.parseInt(line.toString()));
- return line.toString();
- }
-
- @Override
- protected IntWritable getId(String line) throws IOException {
- return id;
- }
-
- @Override
- protected NullWritable getValue(String line) throws IOException {
- return NullWritable.get();
- }
-
- @Override
- protected Iterable<Edge<IntWritable, NullWritable>> getEdges(String line)
- throws IOException {
- return ImmutableList.of();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java
deleted file mode 100644
index ed13b45..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-import org.apache.giraph.utils.IntPair;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-import java.util.regex.Pattern;
-
-/**
- * Simple text-based {@link org.apache.giraph.graph.EdgeInputFormat} for
- * unweighted graphs with int ids.
- *
- * Each line consists of: source_vertex, target_vertex
- */
-public class IntNullTextEdgeInputFormat extends
- TextEdgeInputFormat<IntWritable, NullWritable> {
- /** Splitter for endpoints */
- private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
-
- @Override
- public TextEdgeReader createEdgeReader(
- InputSplit split, TaskAttemptContext context) throws IOException {
- return new IntNullTextEdgeReader();
- }
-
- /**
- * {@link org.apache.giraph.graph.EdgeReader} associated with
- * {@link IntNullTextEdgeInputFormat}.
- */
- public class IntNullTextEdgeReader extends
- TextEdgeReaderFromEachLineProcessed<IntPair> {
- @Override
- protected IntPair preprocessLine(Text line) throws IOException {
- String[] tokens = SEPARATOR.split(line.toString());
- return new IntPair(Integer.valueOf(tokens[0]),
- Integer.valueOf(tokens[1]));
- }
-
- @Override
- protected IntWritable getSourceVertexId(IntPair endpoints)
- throws IOException {
- return new IntWritable(endpoints.getFirst());
- }
-
- @Override
- protected IntWritable getTargetVertexId(IntPair endpoints)
- throws IOException {
- return new IntWritable(endpoints.getSecond());
- }
-
- @Override
- protected NullWritable getValue(IntPair endpoints) throws IOException {
- return NullWritable.get();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexFormat.java
deleted file mode 100644
index c2c1cbb..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexFormat.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-/**
- * Keeps the vertex keys for the input/output vertex format
- */
-public class JsonBase64VertexFormat {
- /** Vertex id key */
- public static final String VERTEX_ID_KEY = "vertexId";
- /** Vertex value key*/
- public static final String VERTEX_VALUE_KEY = "vertexValue";
- /** Edge value array key (all the edges are stored here) */
- public static final String EDGE_ARRAY_KEY = "edgeArray";
-
- /**
- * Don't construct.
- */
- private JsonBase64VertexFormat() { }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java
deleted file mode 100644
index cc5872c..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import com.google.common.collect.Lists;
-import net.iharder.Base64;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Simple way to represent the structure of the graph with a JSON object.
- * The actual vertex ids, values, edges are stored by the
- * Writable serialized bytes that are Byte64 encoded.
- * Works with {@link JsonBase64VertexOutputFormat}
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public class JsonBase64VertexInputFormat<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends TextVertexInputFormat<I, V, E, M> {
-
- @Override
- public TextVertexReader createVertexReader(InputSplit split,
- TaskAttemptContext context) {
- return new JsonBase64VertexReader();
- }
-
- /**
- * Simple reader that supports {@link JsonBase64VertexInputFormat}
- */
- protected class JsonBase64VertexReader extends
- TextVertexReaderFromEachLineProcessed<JSONObject> {
-
-
- @Override
- public void initialize(InputSplit inputSplit, TaskAttemptContext context)
- throws IOException, InterruptedException {
- super.initialize(inputSplit, context);
- }
-
- @Override
- protected JSONObject preprocessLine(Text line) {
- try {
- return new JSONObject(line.toString());
- } catch (JSONException e) {
- throw new IllegalArgumentException(
- "next: Failed to get the vertex", e);
- }
- }
-
- @Override
- protected I getId(JSONObject vertexObject) throws IOException {
- try {
- byte[] decodedWritable = Base64.decode(
- vertexObject.getString(JsonBase64VertexFormat.VERTEX_ID_KEY));
- DataInput input = new DataInputStream(
- new ByteArrayInputStream(decodedWritable));
- I vertexId = getConf().createVertexId();
- vertexId.readFields(input);
- return vertexId;
- } catch (JSONException e) {
- throw new IllegalArgumentException(
- "next: Failed to get vertex id", e);
- }
- }
-
- @Override
- protected V getValue(JSONObject vertexObject) throws IOException {
- try {
- byte[] decodedWritable = Base64.decode(
- vertexObject.getString(JsonBase64VertexFormat.VERTEX_VALUE_KEY));
- DataInputStream input = new DataInputStream(
- new ByteArrayInputStream(decodedWritable));
- V vertexValue = getConf().createVertexValue();
- vertexValue.readFields(input);
- return vertexValue;
- } catch (JSONException e) {
- throw new IllegalArgumentException(
- "next: Failed to get vertex value", e);
- }
- }
-
- @Override
- protected Iterable<Edge<I, E>> getEdges(JSONObject vertexObject) throws
- IOException {
- JSONArray edgeArray = null;
- try {
- edgeArray = vertexObject.getJSONArray(
- JsonBase64VertexFormat.EDGE_ARRAY_KEY);
- } catch (JSONException e) {
- throw new IllegalArgumentException(
- "next: Failed to get edge array", e);
- }
- byte[] decodedWritable;
- List<Edge<I, E>> edges = Lists.newArrayListWithCapacity(
- edgeArray.length());
- for (int i = 0; i < edgeArray.length(); ++i) {
- try {
- decodedWritable = Base64.decode(edgeArray.getString(i));
- } catch (JSONException e) {
- throw new IllegalArgumentException(
- "next: Failed to get edge value", e);
- }
- DataInputStream input = new DataInputStream(
- new ByteArrayInputStream(decodedWritable));
- I targetVertexId = getConf().createVertexId();
- targetVertexId.readFields(input);
- E edgeValue = getConf().createEdgeValue();
- edgeValue.readFields(input);
- edges.add(new Edge<I, E>(targetVertexId, edgeValue));
- }
- return edges;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java
deleted file mode 100644
index eb3c9ac..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import net.iharder.Base64;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-/**
- * Simple way to represent the structure of the graph with a JSON object.
- * The actual vertex ids, values, edges are stored by the
- * Writable serialized bytes that are Byte64 encoded.
- * Works with {@link JsonBase64VertexInputFormat}
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public class JsonBase64VertexOutputFormat<I extends WritableComparable,
- V extends Writable, E extends Writable> extends
- TextVertexOutputFormat<I, V, E> {
-
- @Override
- public TextVertexWriter createVertexWriter(TaskAttemptContext context) {
- return new JsonBase64VertexWriter();
- }
-
- /**
- * Simple writer that supports {@link JsonBase64VertexOutputFormat}
- */
- protected class JsonBase64VertexWriter extends TextVertexWriterToEachLine {
-
- @Override
- protected Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
- throws IOException {
- ByteArrayOutputStream outputStream =
- new ByteArrayOutputStream();
- DataOutput output = new DataOutputStream(outputStream);
- JSONObject vertexObject = new JSONObject();
- vertex.getId().write(output);
- try {
- vertexObject.put(
- JsonBase64VertexFormat.VERTEX_ID_KEY,
- Base64.encodeBytes(outputStream.toByteArray()));
- } catch (JSONException e) {
- throw new IllegalStateException(
- "writerVertex: Failed to insert vertex id", e);
- }
- outputStream.reset();
- vertex.getValue().write(output);
- try {
- vertexObject.put(
- JsonBase64VertexFormat.VERTEX_VALUE_KEY,
- Base64.encodeBytes(outputStream.toByteArray()));
- } catch (JSONException e) {
- throw new IllegalStateException(
- "writerVertex: Failed to insert vertex value", e);
- }
- JSONArray edgeArray = new JSONArray();
- for (Edge<I, E> edge : vertex.getEdges()) {
- outputStream.reset();
- edge.getTargetVertexId().write(output);
- edge.getValue().write(output);
- edgeArray.put(Base64.encodeBytes(outputStream.toByteArray()));
- }
- try {
- vertexObject.put(
- JsonBase64VertexFormat.EDGE_ARRAY_KEY,
- edgeArray);
- } catch (JSONException e) {
- throw new IllegalStateException(
- "writerVertex: Failed to insert edge array", e);
- }
- return new Text(vertexObject.toString());
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java
deleted file mode 100644
index c01d442..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.io;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.json.JSONArray;
-import org.json.JSONException;
-
-import com.google.common.collect.Lists;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * VertexInputFormat that features <code>long</code> vertex ID's,
- * <code>double</code> vertex values and <code>float</code>
- * out-edge weights, and <code>double</code> message types,
- * specified in JSON format.
- */
-public class JsonLongDoubleFloatDoubleVertexInputFormat extends
- TextVertexInputFormat<LongWritable, DoubleWritable,
- FloatWritable, DoubleWritable> {
-
- @Override
- public TextVertexReader createVertexReader(InputSplit split,
- TaskAttemptContext context) {
- return new JsonLongDoubleFloatDoubleVertexReader();
- }
-
- /**
- * VertexReader that features <code>double</code> vertex
- * values and <code>float</code> out-edge weights. The
- * files should be in the following JSON format:
- * JSONArray(<vertex id>, <vertex value>,
- * JSONArray(JSONArray(<dest vertex id>, <edge value>), ...))
- * Here is an example with vertex id 1, vertex value 4.3, and two edges.
- * First edge has a destination vertex 2, edge value 2.1.
- * Second edge has a destination vertex 3, edge value 0.7.
- * [1,4.3,[[2,2.1],[3,0.7]]]
- */
- class JsonLongDoubleFloatDoubleVertexReader extends
- TextVertexReaderFromEachLineProcessedHandlingExceptions<JSONArray,
- JSONException> {
-
- @Override
- protected JSONArray preprocessLine(Text line) throws JSONException {
- return new JSONArray(line.toString());
- }
-
- @Override
- protected LongWritable getId(JSONArray jsonVertex) throws JSONException,
- IOException {
- return new LongWritable(jsonVertex.getLong(0));
- }
-
- @Override
- protected DoubleWritable getValue(JSONArray jsonVertex) throws
- JSONException, IOException {
- return new DoubleWritable(jsonVertex.getDouble(1));
- }
-
- @Override
- protected Iterable<Edge<LongWritable, FloatWritable>> getEdges(
- JSONArray jsonVertex) throws JSONException, IOException {
- JSONArray jsonEdgeArray = jsonVertex.getJSONArray(2);
- List<Edge<LongWritable, FloatWritable>> edges =
- Lists.newArrayListWithCapacity(jsonEdgeArray.length());
- for (int i = 0; i < jsonEdgeArray.length(); ++i) {
- JSONArray jsonEdge = jsonEdgeArray.getJSONArray(i);
- edges.add(new Edge<LongWritable, FloatWritable>(
- new LongWritable(jsonEdge.getLong(0)),
- new FloatWritable((float) jsonEdge.getDouble(1))));
- }
- return edges;
- }
-
- @Override
- protected Vertex<LongWritable, DoubleWritable, FloatWritable,
- DoubleWritable> handleException(Text line, JSONArray jsonVertex,
- JSONException e) {
- throw new IllegalArgumentException(
- "Couldn't get vertex from line " + line, e);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java
deleted file mode 100644
index 2ab7032..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.json.JSONArray;
-import org.json.JSONException;
-
-import java.io.IOException;
-
-/**
- * VertexOutputFormat that supports JSON encoded vertices featuring
- * <code>double</code> values and <code>float</code> out-edge weights
- */
-public class JsonLongDoubleFloatDoubleVertexOutputFormat extends
- TextVertexOutputFormat<LongWritable, DoubleWritable,
- FloatWritable> {
-
- @Override
- public TextVertexWriter createVertexWriter(
- TaskAttemptContext context) {
- return new JsonLongDoubleFloatDoubleVertexWriter();
- }
-
- /**
- * VertexWriter that supports vertices with <code>double</code>
- * values and <code>float</code> out-edge weights.
- */
- private class JsonLongDoubleFloatDoubleVertexWriter extends
- TextVertexWriterToEachLine {
- @Override
- public Text convertVertexToLine(
- Vertex<LongWritable, DoubleWritable,
- FloatWritable, ?> vertex
- ) throws IOException {
- JSONArray jsonVertex = new JSONArray();
- try {
- jsonVertex.put(vertex.getId().get());
- jsonVertex.put(vertex.getValue().get());
- JSONArray jsonEdgeArray = new JSONArray();
- for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
- JSONArray jsonEdge = new JSONArray();
- jsonEdge.put(edge.getTargetVertexId().get());
- jsonEdge.put(edge.getValue().get());
- jsonEdgeArray.put(jsonEdge);
- }
- jsonVertex.put(jsonEdgeArray);
- } catch (JSONException e) {
- throw new IllegalArgumentException(
- "writeVertex: Couldn't write vertex " + vertex);
- }
- return new Text(jsonVertex.toString());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java
deleted file mode 100644
index acae10b..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.io;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-/**
- * InputFormat for reading graphs stored as (ordered) adjacency lists
- * with the vertex ids longs and the vertex values and edges doubles.
- * For example:
- * 22 0.1 45 0.3 99 0.44
- * to repesent a vertex with id 22, value of 0.1 and edges to nodes 45 and 99,
- * with values of 0.3 and 0.44, respectively.
- *
- * @param <M> Message data
- */
-public class LongDoubleDoubleAdjacencyListVertexInputFormat<M extends Writable>
- extends AdjacencyListTextVertexInputFormat<LongWritable, DoubleWritable,
- DoubleWritable, M> {
-
- @Override
- public AdjacencyListTextVertexReader createVertexReader(InputSplit split,
- TaskAttemptContext context) {
- return new LongDoubleDoubleAdjacencyListVertexReader(null);
- }
-
- /**
- * VertexReader associated with
- * {@link LongDoubleDoubleAdjacencyListVertexInputFormat}.
- */
- protected class LongDoubleDoubleAdjacencyListVertexReader extends
- AdjacencyListTextVertexReader {
-
- /**
- * Constructor with {@link LineSanitizer}.
- *
- * @param lineSanitizer the sanitizer to use for reading
- */
- public LongDoubleDoubleAdjacencyListVertexReader(LineSanitizer
- lineSanitizer) {
- super(lineSanitizer);
- }
-
- @Override
- public LongWritable decodeId(String s) {
- return new LongWritable(Long.valueOf(s));
- }
-
- @Override
- public DoubleWritable decodeValue(String s) {
- return new DoubleWritable(Double.valueOf(s));
- }
-
- @Override
- public Edge<LongWritable, DoubleWritable> decodeEdge(
- String s1,
- String s2) {
- return new Edge<LongWritable, DoubleWritable>(
- new LongWritable(Long.valueOf(s1)),
- new DoubleWritable(Double.valueOf(s2)));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/PseudoRandomEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/PseudoRandomEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/PseudoRandomEdgeInputFormat.java
deleted file mode 100644
index 3311691..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/PseudoRandomEdgeInputFormat.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-import com.google.common.collect.Sets;
-import org.apache.giraph.bsp.BspInputSplit;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeInputFormat;
-import org.apache.giraph.graph.EdgeReader;
-import org.apache.giraph.graph.EdgeWithSource;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-
-/**
- * This {@link EdgeInputFormat} generates pseudo-random edges on the fly.
- * As with {@link PseudoRandomVertexInputFormat}, the user specifies the
- * number of vertices and the number of edges per vertex.
- */
-public class PseudoRandomEdgeInputFormat
- extends EdgeInputFormat<LongWritable, DoubleWritable> {
- /** Set the number of aggregate vertices. */
- public static final String AGGREGATE_VERTICES =
- "pseudoRandomEdgeInputFormat.aggregateVertices";
- /** Set the number of edges per vertex (pseudo-random destination). */
- public static final String EDGES_PER_VERTEX =
- "pseudoRandomEdgeInputFormat.edgesPerVertex";
-
- @Override
- public final List<InputSplit> getSplits(final JobContext context,
- final int numWorkers)
- throws IOException, InterruptedException {
- // This is meaningless, the PseudoRandomEdgeReader will generate
- // all the test data
- List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
- for (int i = 0; i < numWorkers; ++i) {
- inputSplitList.add(new BspInputSplit(i, numWorkers));
- }
- return inputSplitList;
- }
-
- @Override
- public EdgeReader<LongWritable, DoubleWritable> createEdgeReader(
- InputSplit split, TaskAttemptContext context) throws IOException {
- return new PseudoRandomEdgeReader();
- }
-
- /**
- * {@link EdgeReader} that generates pseudo-random edges.
- */
- private static class PseudoRandomEdgeReader
- implements EdgeReader<LongWritable, DoubleWritable> {
- /** Logger. */
- private static final Logger LOG =
- Logger.getLogger(PseudoRandomEdgeReader.class);
- /** Starting vertex id. */
- private long startingVertexId = -1;
- /** Vertices read so far. */
- private long verticesRead = 0;
- /** Total vertices to read (on this split alone). */
- private long totalSplitVertices = -1;
- /** Current vertex id. */
- private LongWritable currentVertexId = new LongWritable(-1);
- /** Edges read for the current vertex. */
- private int currentVertexEdgesRead = 0;
- /** Target vertices of edges for current vertex. */
- private Set<LongWritable> currentVertexDestVertices = Sets.newHashSet();
- /** Random number generator for the current vertex (for consistency
- * across runs on different numbers of workers). */
- private Random random = new Random();
- /** Aggregate vertices (all input splits). */
- private long aggregateVertices = -1;
- /** Edges per vertex. */
- private long edgesPerVertex = -1;
- /** BspInputSplit (used only for index). */
- private BspInputSplit bspInputSplit;
- /** Saved configuration */
- private ImmutableClassesGiraphConfiguration configuration;
-
- @Override
- public void initialize(InputSplit inputSplit, TaskAttemptContext context)
- throws IOException, InterruptedException {
- configuration = new ImmutableClassesGiraphConfiguration(
- context.getConfiguration());
- aggregateVertices =
- configuration.getLong(
- PseudoRandomEdgeInputFormat.AGGREGATE_VERTICES, 0);
- if (aggregateVertices <= 0) {
- throw new IllegalArgumentException(
- PseudoRandomEdgeInputFormat.AGGREGATE_VERTICES + " <= 0");
- }
- if (inputSplit instanceof BspInputSplit) {
- bspInputSplit = (BspInputSplit) inputSplit;
- long extraVertices =
- aggregateVertices % bspInputSplit.getNumSplits();
- totalSplitVertices =
- aggregateVertices / bspInputSplit.getNumSplits();
- if (bspInputSplit.getSplitIndex() < extraVertices) {
- ++totalSplitVertices;
- }
- startingVertexId = (bspInputSplit.getSplitIndex() *
- (aggregateVertices / bspInputSplit.getNumSplits())) +
- Math.min(bspInputSplit.getSplitIndex(),
- extraVertices);
- } else {
- throw new IllegalArgumentException(
- "initialize: Got " + inputSplit.getClass() +
- " instead of " + BspInputSplit.class);
- }
- edgesPerVertex = configuration.getLong(
- PseudoRandomEdgeInputFormat.EDGES_PER_VERTEX, 0);
- if (edgesPerVertex <= 0) {
- throw new IllegalArgumentException(
- PseudoRandomEdgeInputFormat.EDGES_PER_VERTEX + " <= 0");
- }
- }
-
- @Override
- public boolean nextEdge() throws IOException, InterruptedException {
- return totalSplitVertices > verticesRead + 1 ||
- (totalSplitVertices == verticesRead + 1 &&
- edgesPerVertex > currentVertexEdgesRead);
- }
-
- @Override
- public EdgeWithSource<LongWritable, DoubleWritable> getCurrentEdge()
- throws IOException, InterruptedException {
- if (currentVertexEdgesRead == edgesPerVertex) {
- ++verticesRead;
- currentVertexId = new LongWritable(-1);
- }
-
- if (currentVertexId.get() == -1) {
- currentVertexId.set(startingVertexId + verticesRead);
- currentVertexEdgesRead = 0;
- // Seed on the vertex id to keep the vertex data the same when
- // on different number of workers, but other parameters are the
- // same.
- random.setSeed(currentVertexId.get());
- currentVertexDestVertices.clear();
- }
-
- LongWritable destVertexId;
- do {
- destVertexId =
- new LongWritable(Math.abs(random.nextLong()) %
- aggregateVertices);
- } while (currentVertexDestVertices.contains(destVertexId));
- ++currentVertexEdgesRead;
- currentVertexDestVertices.add(destVertexId);
- if (LOG.isTraceEnabled()) {
- LOG.trace("getCurrentEdge: Return edge (" + currentVertexId + ", " +
- "" + destVertexId + ")");
- }
- return new EdgeWithSource<LongWritable, DoubleWritable>(
- currentVertexId,
- new Edge<LongWritable, DoubleWritable>(
- destVertexId,
- new DoubleWritable(random.nextDouble())));
- }
-
- @Override
- public void close() throws IOException { }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return (verticesRead * edgesPerVertex + currentVertexEdgesRead) *
- 100.0f / (totalSplitVertices * edgesPerVertex);
- }
- }
-}