You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by cl...@apache.org on 2013/11/18 16:00:25 UTC
[1/2] git commit: updated refs/heads/trunk to f7c3025
Updated Branches:
refs/heads/trunk e987492ee -> f7c302587
GIRAPH-760
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/b151d7a9
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/b151d7a9
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/b151d7a9
Branch: refs/heads/trunk
Commit: b151d7a97f8f06451a915668e01f393ad0445b36
Parents: e987492
Author: Claudio Martella <cl...@gmail.com>
Authored: Mon Nov 18 15:59:15 2013 +0100
Committer: Claudio Martella <cl...@gmail.com>
Committed: Mon Nov 18 15:59:15 2013 +0100
----------------------------------------------------------------------
.../giraph/io/gora/GoraEdgeOutputFormat.java | 281 +++++++++++++++++
.../io/gora/GoraGEdgeEdgeOutputFormat.java | 76 +++++
.../giraph/io/gora/generated/GEdgeResult.java | 314 +++++++++++++++++++
.../io/gora/GoraTestEdgeOutputFormat.java | 119 +++++++
.../io/gora/TestGoraEdgeOutputFormat.java | 93 ++++++
5 files changed, 883 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/b151d7a9/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraEdgeOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraEdgeOutputFormat.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraEdgeOutputFormat.java
new file mode 100644
index 0000000..be9f472
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraEdgeOutputFormat.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora;
+
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_DATASTORE_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_KEY_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS;
+
+import java.io.IOException;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.io.EdgeOutputFormat;
+import org.apache.giraph.io.EdgeWriter;
+import org.apache.giraph.io.gora.utils.GoraUtils;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.util.GoraException;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.log4j.Logger;
+
+/**
+ * Class which wraps the GoraInputFormat. It's designed
+ * as an extension point to EdgeOutputFormat subclasses who wish
+ * to write to Gora data sources.
+ *
+ * Works with
+ * {@link GoraEdgeInputFormat}
+ *
+ * @param <I> edge id type
+ * @param <V> vertex type
+ * @param <E> edge type
+ */
+public abstract class GoraEdgeOutputFormat<I extends WritableComparable,
+ V extends Writable, E extends Writable>
+ extends EdgeOutputFormat<I, V, E> {
+
+ /** Logger for Gora's vertex input format. */
+ private static final Logger LOG =
+ Logger.getLogger(GoraEdgeOutputFormat.class);
+
+ /** KeyClass used for getting data. */
+ private static Class<?> KEY_CLASS;
+
+ /** The vertex itself will be used as a value inside Gora. */
+ private static Class<? extends Persistent> PERSISTENT_CLASS;
+
+ /** Data store class to be used as backend. */
+ private static Class<? extends DataStore> DATASTORE_CLASS;
+
+ /** Data store used for querying data. */
+ private static DataStore DATA_STORE;
+
+ /**
+ * checkOutputSpecs
+ *
+ * @param context information about the job
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ public void checkOutputSpecs(JobContext context)
+ throws IOException, InterruptedException {
+ }
+
+ /**
+ * Gets the data store object initialized.
+ * @return DataStore created
+ */
+ public DataStore createDataStore() {
+ DataStore dsCreated = null;
+ try {
+ dsCreated = GoraUtils.createSpecificDataStore(getDatastoreClass(),
+ getKeyClass(), getPersistentClass());
+ } catch (GoraException e) {
+ getLogger().error("Error creating data store.");
+ e.printStackTrace();
+ }
+ return dsCreated;
+ }
+
+ @Override
+ public abstract GoraEdgeWriter
+ createEdgeWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException;
+
+ /**
+ * getOutputCommitter
+ *
+ * @param context the task context
+ * @return OutputCommitter
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new NullOutputCommitter();
+ }
+
+ /**
+ * Empty output commiter for hadoop.
+ */
+ private static class NullOutputCommitter extends OutputCommitter {
+ @Override
+ public void abortTask(TaskAttemptContext arg0) throws IOException { }
+
+ @Override
+ public void commitTask(TaskAttemptContext arg0) throws IOException { }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void setupJob(JobContext arg0) throws IOException { }
+
+ @Override
+ public void setupTask(TaskAttemptContext arg0) throws IOException { }
+ }
+
+ /**
+ * Abstract class to be implemented by the user based on their specific
+ * vertex/edges output.
+ */
+ protected abstract class GoraEdgeWriter extends EdgeWriter<I, V, E> {
+ @Override
+ public void initialize(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ String sDataStoreType =
+ GIRAPH_GORA_OUTPUT_DATASTORE_CLASS.get(getConf());
+ String sKeyType =
+ GIRAPH_GORA_OUTPUT_KEY_CLASS.get(getConf());
+ String sPersistentType =
+ GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS.get(getConf());
+ try {
+ Class<?> keyClass = Class.forName(sKeyType);
+ Class<?> persistentClass = Class.forName(sPersistentType);
+ Class<?> dataStoreClass = Class.forName(sDataStoreType);
+ setKeyClass(keyClass);
+ setPersistentClass((Class<? extends Persistent>) persistentClass);
+ setDatastoreClass((Class<? extends DataStore>) dataStoreClass);
+ setDataStore(createDataStore());
+ if (getDataStore() != null) {
+ getLogger().debug("The data store has been created.");
+ }
+ } catch (ClassNotFoundException e) {
+ getLogger().error("Error while reading Gora Output parameters");
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void close(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ getDataStore().flush();
+ getDataStore().close();
+ }
+
+ @Override
+ public void writeEdge(I srcId, V srcValue, Edge<I, E> edge)
+ throws IOException, InterruptedException {
+ Persistent goraEdge = null;
+ Object goraKey = getGoraKey(srcId, srcValue, edge);
+ goraEdge = getGoraEdge(srcId, srcValue, edge);
+ getDataStore().put(goraKey, goraEdge);
+ }
+
+ /**
+ * Each edge needs to be transformed into a Gora object to be sent to
+ * a specific data store.
+ *
+ * @param edge edge to be transformed into a Gora object
+ * @param srcId source vertex id
+ * @param srcValue source vertex value
+ * @return Gora representation of the vertex
+ */
+ protected abstract Persistent getGoraEdge
+ (I srcId, V srcValue, Edge<I, E> edge);
+
+ /**
+ * Gets the correct key from a computed vertex.
+ * @param edge edge to extract the key from.
+ * @param srcId source vertex id
+ * @param srcValue source vertex value
+ * @return The key representing such edge.
+ */
+ protected abstract Object getGoraKey(I srcId, V srcValue, Edge<I, E> edge);
+ }
+
+ /**
+ * Gets the data store.
+ * @return DataStore
+ */
+ public static DataStore getDataStore() {
+ return DATA_STORE;
+ }
+
+ /**
+ * Sets the data store
+ * @param dStore the dATA_STORE to set
+ */
+ public static void setDataStore(DataStore dStore) {
+ DATA_STORE = dStore;
+ }
+
+ /**
+ * Gets the persistent Class
+ * @return persistentClass used
+ */
+ static Class<? extends Persistent> getPersistentClass() {
+ return PERSISTENT_CLASS;
+ }
+
+ /**
+ * Sets the persistent Class
+ * @param persistentClassUsed to be set
+ */
+ static void setPersistentClass
+ (Class<? extends Persistent> persistentClassUsed) {
+ PERSISTENT_CLASS = persistentClassUsed;
+ }
+
+ /**
+ * Gets the key class used.
+ * @return the key class used.
+ */
+ static Class<?> getKeyClass() {
+ return KEY_CLASS;
+ }
+
+ /**
+ * Sets the key class used.
+ * @param keyClassUsed key class used.
+ */
+ static void setKeyClass(Class<?> keyClassUsed) {
+ KEY_CLASS = keyClassUsed;
+ }
+
+ /**
+ * @return Class the DATASTORE_CLASS
+ */
+ public static Class<? extends DataStore> getDatastoreClass() {
+ return DATASTORE_CLASS;
+ }
+
+ /**
+ * @param dataStoreClass the dataStore class to set
+ */
+ public static void setDatastoreClass(
+ Class<? extends DataStore> dataStoreClass) {
+ DATASTORE_CLASS = dataStoreClass;
+ }
+
+ /**
+ * Gets the logger for the class.
+ * @return the log of the class.
+ */
+ public static Logger getLogger() {
+ return LOG;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/b151d7a9/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGEdgeEdgeOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGEdgeEdgeOutputFormat.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGEdgeEdgeOutputFormat.java
new file mode 100644
index 0000000..d350d37
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGEdgeEdgeOutputFormat.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora;
+
+import java.io.IOException;
+
+import org.apache.avro.util.Utf8;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.io.gora.generated.GEdgeResult;
+import org.apache.gora.persistency.Persistent;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Implementation of a specific writer for a generated data bean.
+ */
+public class GoraGEdgeEdgeOutputFormat
+ extends GoraEdgeOutputFormat<LongWritable, DoubleWritable,
+ FloatWritable> {
+
+ /**
+ * Default constructor
+ */
+ public GoraGEdgeEdgeOutputFormat() {
+ }
+
+ @Override
+ public GoraEdgeWriter createEdgeWriter(
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ return new GoraGEdgeEdgeWriter();
+ }
+
+ /**
+ * Gora edge writer.
+ */
+ protected class GoraGEdgeEdgeWriter
+ extends GoraEdgeWriter {
+
+ @Override
+ protected Persistent getGoraEdge(LongWritable srcId,
+ DoubleWritable srcValue, Edge<LongWritable, FloatWritable> edge) {
+ GEdgeResult tmpGEdge = new GEdgeResult();
+ tmpGEdge.setEdgeId(new Utf8(srcId.toString()));
+ tmpGEdge.setEdgeWeight(edge.getValue().get());
+ tmpGEdge.setVertexOutId(new Utf8(edge.getTargetVertexId().toString()));
+ getLogger().debug("GoraObject created: " + tmpGEdge.toString());
+ return tmpGEdge;
+ }
+
+ @Override
+ protected Object getGoraKey(LongWritable srcId,
+ DoubleWritable srcValue, Edge<LongWritable, FloatWritable> edge) {
+ String goraKey = String.valueOf(
+ edge.getTargetVertexId().get() + edge.getValue().get());
+ return goraKey;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/b151d7a9/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GEdgeResult.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GEdgeResult.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GEdgeResult.java
new file mode 100644
index 0000000..0c3501c
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GEdgeResult.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.gora.generated;
+
+import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.StateManager;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.persistency.impl.StateManagerImpl;
+
+/**
+ * Example Class for defining a Giraph-Edge.
+ */
+@SuppressWarnings("all")
+public class GEdgeResult extends PersistentBase {
+ /**
+ * Schema used for the class.
+ */
+ public static final Schema OBJ_SCHEMA = Schema.parse("{\"type\":\"record\"," +
+ "\"name\":\"GEdge\",\"namespace\":\"org.apache.giraph.gora.generated\"," +
+ "\"fields\":[{\"name\":\"edgeId\",\"type\":\"string\"}," +
+ "{\"name\":\"edgeWeight\",\"type\":\"float\"}," +
+ "{\"name\":\"vertexInId\",\"type\":\"string\"}," +
+ "{\"name\":\"vertexOutId\",\"type\":\"string\"}," +
+ "{\"name\":\"label\",\"type\":\"string\"}]}");
+
+ /**
+ * Field enum
+ */
+ public static enum Field {
+ /**
+ * Edge id.
+ */
+ EDGE_ID(0, "edgeId"),
+
+ /**
+ * Edge weight.
+ */
+ EDGE_WEIGHT(1, "edgeWeight"),
+
+ /**
+ * Edge vertex source id.
+ */
+ VERTEX_IN_ID(2, "vertexInId"),
+
+ /**
+ * Edge vertex end id.
+ */
+ VERTEX_OUT_ID(3, "vertexOutId"),
+
+ /**
+ * Edge label.
+ */
+ LABEL(4, "label");
+
+ /**
+ * Field index
+ */
+ private int index;
+
+ /**
+ * Field name
+ */
+ private String name;
+
+ /**
+ * Field constructor
+ * @param index of attribute
+ * @param name of attribute
+ */
+ Field(int index, String name) {
+ this.index = index;
+ this.name = name;
+ }
+
+ /**
+ * Gets index
+ * @return int of attribute.
+ */
+ public int getIndex() {
+ return index;
+ }
+
+ /**
+ * Gets name
+ * @return String of name.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Gets name
+ * @return String of name.
+ */
+ public String toString() {
+ return name;
+ }
+ };
+
+ /**
+ * Array containing all fields/
+ */
+ private static final String[] ALL_FIELDS = {
+ "edgeId", "edgeWeight", "vertexInId", "vertexOutId", "label"
+ };
+
+ static {
+ PersistentBase.registerFields(GEdgeResult.class, ALL_FIELDS);
+ }
+
+ /**
+ * edgeId
+ */
+ private Utf8 edgeId;
+
+ /**
+ * edgeWeight
+ */
+ private float edgeWeight;
+
+ /**
+ * vertexInId
+ */
+ private Utf8 vertexInId;
+
+ /**
+ * vertexOutId
+ */
+ private Utf8 vertexOutId;
+
+ /**
+ * label
+ */
+ private Utf8 label;
+
+ /**
+ * Default constructor.
+ */
+ public GEdgeResult() {
+ this(new StateManagerImpl());
+ }
+
+ /**
+ * Constructor
+ * @param stateManager from which the object will be created.
+ */
+ public GEdgeResult(StateManager stateManager) {
+ super(stateManager);
+ }
+
+ /**
+ * Creates a new instance
+ * @param stateManager from which the object will be created.
+ * @return GEdge created
+ */
+ public GEdgeResult newInstance(StateManager stateManager) {
+ return new GEdgeResult(stateManager);
+ }
+
+ /**
+ * Gets the object schema
+ * @return Schema of the object.
+ */
+ public Schema getSchema() {
+ return OBJ_SCHEMA;
+ }
+
+ /**
+ * Gets field
+ * @param fieldIndex index field.
+ * @return Object from an index.
+ */
+ public Object get(int fieldIndex) {
+ switch (fieldIndex) {
+ case 0:
+ return edgeId;
+ case 1:
+ return edgeWeight;
+ case 2:
+ return vertexInId;
+ case 3:
+ return vertexOutId;
+ case 4:
+ return label;
+ default:
+ throw new AvroRuntimeException("Bad index");
+ }
+ }
+
+ /**
+ * Puts a value into a field.
+ * @param fieldIndex index of field used.
+ * @param fieldValue value of field used.
+ */
+ @SuppressWarnings(value = "unchecked")
+ public void put(int fieldIndex, Object fieldValue) {
+ if (isFieldEqual(fieldIndex, fieldValue)) {
+ return;
+ }
+ getStateManager().setDirty(this, fieldIndex);
+ switch (fieldIndex) {
+ case 0:
+ edgeId = (Utf8) fieldValue; break;
+ case 1:
+ edgeWeight = (Float) fieldValue; break;
+ case 2:
+ vertexInId = (Utf8) fieldValue; break;
+ case 3:
+ vertexOutId = (Utf8) fieldValue; break;
+ case 4:
+ label = (Utf8) fieldValue; break;
+ default:
+ throw new AvroRuntimeException("Bad index");
+ }
+ }
+
+ /**
+ * Gets edgeId
+ * @return Utf8 edgeId
+ */
+ public Utf8 getEdgeId() {
+ return (Utf8) get(0);
+ }
+
+ /**
+ * Sets edgeId
+ * @param value edgeId
+ */
+ public void setEdgeId(Utf8 value) {
+ put(0, value);
+ }
+
+ /**
+ * Gets edgeWeight
+ * @return float edgeWeight
+ */
+ public float getEdgeWeight() {
+ return (Float) get(1);
+ }
+
+ /**
+ * Sets edgeWeight
+ * @param value edgeWeight
+ */
+ public void setEdgeWeight(float value) {
+ put(1, value);
+ }
+
+ /**
+ * Gets edgeVertexInId
+ * @return Utf8 edgeVertexInId
+ */
+ public Utf8 getVertexInId() {
+ return (Utf8) get(2);
+ }
+
+ /**
+ * Sets edgeVertexInId
+ * @param value edgeVertexInId
+ */
+ public void setVertexInId(Utf8 value) {
+ put(2, value);
+ }
+
+ /**
+ * Gets edgeVertexOutId
+ * @return Utf8 edgeVertexOutId
+ */
+ public Utf8 getVertexOutId() {
+ return (Utf8) get(3);
+ }
+
+ /**
+ * Sets edgeVertexOutId
+ * @param value edgeVertexOutId
+ */
+ public void setVertexOutId(Utf8 value) {
+ put(3, value);
+ }
+
+ /**
+ * Gets edgeLabel
+ * @return Utf8 edgeLabel
+ */
+ public Utf8 getLabel() {
+ return (Utf8) get(4);
+ }
+
+ /**
+ * Sets edgeLabel
+ * @param value edgeLabel
+ */
+ public void setLabel(Utf8 value) {
+ put(4, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/b151d7a9/giraph-gora/src/test/java/org/apache/giraph/io/gora/GoraTestEdgeOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/test/java/org/apache/giraph/io/gora/GoraTestEdgeOutputFormat.java b/giraph-gora/src/test/java/org/apache/giraph/io/gora/GoraTestEdgeOutputFormat.java
new file mode 100644
index 0000000..0254498
--- /dev/null
+++ b/giraph-gora/src/test/java/org/apache/giraph/io/gora/GoraTestEdgeOutputFormat.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.avro.util.Utf8;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.io.gora.GoraEdgeOutputFormat;
+import org.apache.giraph.io.gora.generated.GEdge;
+import org.apache.giraph.io.gora.generated.GEdgeResult;
+import org.apache.gora.persistency.Persistent;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Implementation of a specific writer for a generated data bean.
+ */
+public class GoraTestEdgeOutputFormat
+ extends GoraEdgeOutputFormat<LongWritable, DoubleWritable,
+ FloatWritable> {
+
+ /**
+ * Default constructor
+ */
+ public GoraTestEdgeOutputFormat() {
+ }
+
+ @Override
+ public GoraEdgeWriter createEdgeWriter(
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ return new GoraGEdgeEdgeWriter();
+ }
+
+ /**
+ * Gora edge writer.
+ */
+ protected class GoraGEdgeEdgeWriter
+ extends GoraEdgeWriter {
+
+ @Override
+ protected Persistent getGoraEdge(LongWritable srcId,
+ DoubleWritable srcValue, Edge<LongWritable, FloatWritable> edge) {
+ GEdgeResult tmpGEdge = new GEdgeResult();
+ Utf8 keyLabel = new Utf8(srcId.toString() + "-" +
+ edge.getTargetVertexId().toString());
+ tmpGEdge.setEdgeId(keyLabel);
+ tmpGEdge.setEdgeWeight(edge.getValue().get());
+ tmpGEdge.setVertexInId(new Utf8(srcId.toString()));
+ tmpGEdge.setVertexOutId(new Utf8(edge.getTargetVertexId().toString()));
+ tmpGEdge.setLabel(keyLabel);
+ getLogger().debug("GoraObject created: " + tmpGEdge.toString());
+ return tmpGEdge;
+ }
+
+ @Override
+ protected Object getGoraKey(LongWritable srcId,
+ DoubleWritable srcValue, Edge<LongWritable, FloatWritable> edge) {
+ String goraKey = String.valueOf(
+ edge.getTargetVertexId().get() + edge.getValue().get());
+ return goraKey;
+ }
+
+ @Override
+ public void writeEdge(LongWritable srcId, DoubleWritable srcValue,
+ Edge<LongWritable, FloatWritable> edge)
+ throws IOException, InterruptedException {
+ super.writeEdge(srcId, srcValue, edge);
+ Object goraKey = getGoraKey(srcId, srcValue, edge);
+ String keyLabel = String.valueOf(srcId) + "-" +
+ String.valueOf(edge.getTargetVertexId());
+ float weight = Float.valueOf(srcId.toString()) +
+ Float.valueOf(edge.getTargetVertexId().toString());
+ // Asserting
+ Assert.assertEquals(createEdge(keyLabel, String.valueOf(srcId),
+ String.valueOf(edge.getTargetVertexId()),keyLabel, weight),
+ getDataStore().get(goraKey));
+ }
+
+ /**
+ * Creates an edge using an id and a set of edges.
+ * @param id Vertex id.
+ * @param vertexInId Vertex source Id.
+ * @param vertexOutId Vertex destination Id.
+ * @param edgeLabel Edge label.
+ * @param edgeWeight Edge wight.
+ * @return GEdge created.
+ */
+ private GEdge createEdge(String id, String vertexInId,
+ String vertexOutId, String edgeLabel, float edgeWeight) {
+ GEdge newEdge = new GEdge();
+ newEdge.setEdgeId(new Utf8(id));
+ newEdge.setVertexInId(new Utf8(vertexInId));
+ newEdge.setVertexOutId(new Utf8(vertexOutId));
+ newEdge.setLabel(new Utf8(edgeLabel));
+ newEdge.setEdgeWeight(edgeWeight);
+ return newEdge;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/b151d7a9/giraph-gora/src/test/java/org/apache/giraph/io/gora/TestGoraEdgeOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/test/java/org/apache/giraph/io/gora/TestGoraEdgeOutputFormat.java b/giraph-gora/src/test/java/org/apache/giraph/io/gora/TestGoraEdgeOutputFormat.java
new file mode 100644
index 0000000..c9ac38a
--- /dev/null
+++ b/giraph-gora/src/test/java/org/apache/giraph/io/gora/TestGoraEdgeOutputFormat.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora;
+
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_DATASTORE_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_END_KEY;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEYS_FACTORY_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEY_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_PERSISTENT_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_START_KEY;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_DATASTORE_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_KEY_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS;
+
+import java.io.IOException;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test class for Gora edge output formats.
+ */
+public class TestGoraEdgeOutputFormat {
+
+ @Test
+ public void getWritingDb() throws Exception {
+ Iterable<String> results;
+ GiraphConfiguration conf = new GiraphConfiguration();
+ // Parameters for input
+ GIRAPH_GORA_DATASTORE_CLASS.
+ set(conf, "org.apache.gora.memory.store.MemStore");
+ GIRAPH_GORA_KEYS_FACTORY_CLASS.
+ set(conf,"org.apache.giraph.io.gora.utils.DefaultKeyFactory");
+ GIRAPH_GORA_KEY_CLASS.set(conf,"java.lang.String");
+ GIRAPH_GORA_PERSISTENT_CLASS.
+ set(conf,"org.apache.giraph.io.gora.generated.GEdge");
+ GIRAPH_GORA_START_KEY.set(conf,"1");
+ GIRAPH_GORA_END_KEY.set(conf,"4");
+ conf.set("io.serializations",
+ "org.apache.hadoop.io.serializer.WritableSerialization," +
+ "org.apache.hadoop.io.serializer.JavaSerialization");
+ conf.setComputationClass(EmptyComputation.class);
+ conf.setEdgeInputFormatClass(GoraTestEdgeInputFormat.class);
+ // Parameters for output
+ GIRAPH_GORA_OUTPUT_DATASTORE_CLASS.
+ set(conf, "org.apache.gora.memory.store.MemStore");
+ GIRAPH_GORA_OUTPUT_KEY_CLASS.set(conf, "java.lang.String");
+ GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS.
+ set(conf,"org.apache.giraph.io.gora.generated.GEdge");
+ conf.setEdgeOutputFormatClass(GoraTestEdgeOutputFormat.class);
+ results = InternalVertexRunner.run(conf, new String[0], new String[0]);
+ Assert.assertNotNull(results);
+ }
+
+ /*
+ Test compute method that sends each edge a notification of its parents.
+ The test set only has a 1-1 parent-to-child ratio for this unit test.
+ */
+ public static class EmptyComputation
+ extends BasicComputation<LongWritable, DoubleWritable,
+ FloatWritable, LongWritable> {
+
+ @Override
+ public void compute(
+ Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
+ Iterable<LongWritable> messages) throws IOException {
+ Assert.assertNotNull(vertex);
+ vertex.voteToHalt();
+ }
+ }
+}
[2/2] git commit: updated refs/heads/trunk to f7c3025
Posted by cl...@apache.org.
GIRAPH-760
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/f7c30258
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/f7c30258
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/f7c30258
Branch: refs/heads/trunk
Commit: f7c302587c1191d3c68508ea6395d55e22ea1175
Parents: b151d7a
Author: Claudio Martella <cl...@gmail.com>
Authored: Mon Nov 18 15:59:52 2013 +0100
Committer: Claudio Martella <cl...@gmail.com>
Committed: Mon Nov 18 15:59:52 2013 +0100
----------------------------------------------------------------------
CHANGELOG | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/f7c30258/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index c639ba1..e5a4a1b 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 1.1.0 - unreleased
+ GIRAPH-760: Create EdgeOutputFormat to Apache Gora (renato2099 via claudio)
+
GIRAPH-759: Create EdgeInputFormat from Apache Gora (renato2099 via claudio)
GIRAPH-758: Create VertexOutputFormat to Apache Gora (renato2099 via claudio)