You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2017/01/23 16:21:17 UTC
[3/3] incubator-rya git commit: initial giraph support; closes #132
initial giraph support; closes #132
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/45efa55b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/45efa55b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/45efa55b
Branch: refs/heads/master
Commit: 45efa55b01e58343d33522937c964d7fe6acfbe9
Parents: ccb5a67
Author: pujav65 <pu...@gmail.com>
Authored: Tue Dec 20 16:35:06 2016 -0500
Committer: pujav65 <pu...@gmail.com>
Committed: Mon Jan 23 11:20:35 2017 -0500
----------------------------------------------------------------------
.../apache/rya/sail/config/RyaSailFactory.java | 2 +-
extras/pom.xml | 3 +-
extras/rya.giraph/pom.xml | 85 ++++++++++++
.../rya/giraph/format/RyaEdgeInputFormat.java | 61 +++++++++
.../apache/rya/giraph/format/RyaEdgeReader.java | 84 ++++++++++++
.../rya/giraph/format/RyaGiraphUtils.java | 78 +++++++++++
.../rya/giraph/format/RyaVertexInputFormat.java | 75 +++++++++++
.../rya/giraph/format/RyaVertexReader.java | 94 +++++++++++++
.../rya/giraph/format/TestTextOutputFormat.java | 44 +++++++
.../rya/giraph/format/TestVertexFormat.java | 132 +++++++++++++++++++
.../apache/rya/accumulo/mr/RyaInputFormat.java | 14 ++
11 files changed, 670 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java b/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java
index 60ab615..8ce6694 100644
--- a/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java
+++ b/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java
@@ -133,7 +133,7 @@ public class RyaSailFactory {
return dao;
}
- private static AccumuloRyaDAO getAccumuloDAO(final AccumuloRdfConfiguration config) throws AccumuloException, AccumuloSecurityException, RyaDAOException {
+ public static AccumuloRyaDAO getAccumuloDAO(final AccumuloRdfConfiguration config) throws AccumuloException, AccumuloSecurityException, RyaDAOException {
final Connector connector = ConfigUtils.getConnector(config);
final AccumuloRyaDAO dao = new AccumuloRyaDAO();
dao.setConnector(connector);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/pom.xml
----------------------------------------------------------------------
diff --git a/extras/pom.xml b/extras/pom.xml
index 2ac129a..6acb51f 100644
--- a/extras/pom.xml
+++ b/extras/pom.xml
@@ -42,7 +42,8 @@ under the License.
<module>rya.pcj.fluo</module>
<module>rya.export</module>
<module>rya.merger</module>
- <module>rya.benchmark</module>
+ <module>rya.giraph</module>
+ <module>rya.benchmark</module>
</modules>
<profiles>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/rya.giraph/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.giraph/pom.xml b/extras/rya.giraph/pom.xml
new file mode 100644
index 0000000..2615b34
--- /dev/null
+++ b/extras/rya.giraph/pom.xml
@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.extras</artifactId>
+ <version>3.2.11-incubating-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>rya.giraph</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.giraph</groupId>
+ <artifactId>giraph-core</artifactId>
+ <version>1.2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.giraph</groupId>
+ <artifactId>giraph-accumulo</artifactId>
+ <version>1.2.0</version>
+ </dependency>
+<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ <version>2.5.1</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>2.5.1</version>
+ <scope>provided</scope>
+</dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>accumulo.rya</artifactId>
+ <exclusions>
+ <exclusion>
+ <artifactId>
+ hadoop-mapreduce-client-jobclient
+ </artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>
+ hadoop-mapreduce-client-core
+ </artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.mapreduce</artifactId>
+ <exclusions>
+ <exclusion>
+ <artifactId>spark-graphx_2.11</artifactId>
+ <groupId>org.apache.spark</groupId>
+ </exclusion>
+
+ </exclusions>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaEdgeInputFormat.java b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaEdgeInputFormat.java
new file mode 100644
index 0000000..761e409
--- /dev/null
+++ b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaEdgeInputFormat.java
@@ -0,0 +1,61 @@
+package org.apache.rya.giraph.format;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.rya.accumulo.mr.RyaInputFormat;
+import org.apache.rya.accumulo.mr.RyaInputFormat.RyaStatementRecordReader;
+import org.apache.rya.accumulo.mr.RyaStatementWritable;
+import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import org.apache.rya.api.resolver.RyaTripleContext;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class RyaEdgeInputFormat extends EdgeInputFormat<Text, RyaStatementWritable> {
+
+ private RyaInputFormat ryaInputFormat = new RyaInputFormat();
+ private TABLE_LAYOUT rdfTableLayout;
+ private RyaTripleContext tripleContext;
+
+ @Override
+ public EdgeReader<Text, RyaStatementWritable> createEdgeReader(InputSplit split, TaskAttemptContext context) throws IOException {
+ return new RyaEdgeReader((RyaStatementRecordReader) ryaInputFormat.createRecordReader(split, context),
+ rdfTableLayout, tripleContext,
+ context.getConfiguration());
+ }
+
+ @Override
+ public void checkInputSpecs(Configuration arg0) {
+ // nothing to do
+
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context, int arg1) throws IOException, InterruptedException {
+ return ryaInputFormat.getSplits(context);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaEdgeReader.java
----------------------------------------------------------------------
diff --git a/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaEdgeReader.java b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaEdgeReader.java
new file mode 100644
index 0000000..f0f9017
--- /dev/null
+++ b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaEdgeReader.java
@@ -0,0 +1,84 @@
+package org.apache.rya.giraph.format;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.io.EdgeReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.rya.accumulo.mr.RyaInputFormat.RyaStatementRecordReader;
+import org.apache.rya.accumulo.mr.RyaStatementWritable;
+import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.resolver.RyaTripleContext;
+
+public class RyaEdgeReader extends EdgeReader<Text, RyaStatementWritable>{
+
+ private RyaStatementRecordReader reader;
+ private RyaTripleContext tripleContext;
+ private TABLE_LAYOUT tableLayout;
+
+ public RyaEdgeReader(RyaStatementRecordReader recordReader,
+ TABLE_LAYOUT rdfTableLayout, RyaTripleContext tripleContext, Configuration conf){
+ this.reader = recordReader;
+ this.tableLayout = rdfTableLayout;
+ this.tripleContext = tripleContext;
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
+ reader.initialize(inputSplit, context, tripleContext, tableLayout);
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return reader.getProgress();
+ }
+
+ @Override
+ public boolean nextEdge() throws IOException, InterruptedException {
+ return reader.nextKeyValue();
+ }
+
+ @Override
+ public Text getCurrentSourceId() throws IOException, InterruptedException {
+ RyaStatementWritable currentStatement = reader.getCurrentValue();
+ return new Text(currentStatement.getRyaStatement().getSubject().getData());
+ }
+
+ @Override
+ public Edge<Text, RyaStatementWritable> getCurrentEdge() throws IOException, InterruptedException {
+ RyaStatementWritable currentStatement = reader.getCurrentValue();
+ RyaStatement ryaStatement = currentStatement.getRyaStatement();
+ Edge<Text, RyaStatementWritable> edge = EdgeFactory.create(new Text(ryaStatement.toString()),
+ currentStatement);
+ return edge;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaGiraphUtils.java
----------------------------------------------------------------------
diff --git a/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaGiraphUtils.java b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaGiraphUtils.java
new file mode 100644
index 0000000..37b44d7
--- /dev/null
+++ b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaGiraphUtils.java
@@ -0,0 +1,78 @@
+package org.apache.rya.giraph.format;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.rya.accumulo.AccumuloRdfConstants;
+import org.apache.rya.accumulo.mr.MRUtils;
+import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import org.apache.rya.api.RdfCloudTripleStoreUtils;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+
+public class RyaGiraphUtils {
+
+ public static void initializeAccumuloInputFormat(Configuration conf){
+ // get accumulo connect information
+ boolean mock = MRUtils.getACMock(conf, false);
+ String zk = MRUtils.getACZK(conf);
+ String instance = MRUtils.getACInstance(conf);
+ String userName = MRUtils.getACUserName(conf);
+ String pwd = MRUtils.getACPwd(conf);
+ String tablePrefix = MRUtils.getTablePrefix(conf);
+ TABLE_LAYOUT rdfTableLayout = MRUtils.getTableLayout(conf, TABLE_LAYOUT.SPO);
+ String authString = conf.get(MRUtils.AC_AUTH_PROP);
+ Authorizations authorizations;
+ if (authString != null && !authString.isEmpty()) {
+ authorizations = new Authorizations(authString.split(","));
+ conf.set(ConfigUtils.CLOUDBASE_AUTHS, authString); // for consistency
+ }
+ else {
+ authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS;
+ }
+
+
+ // set up the accumulo input format so that we know what table to use and everything
+ try {
+ Job job = new Job(conf);
+ AccumuloInputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd));
+ String tableName = RdfCloudTripleStoreUtils.layoutPrefixToTable(rdfTableLayout, tablePrefix);
+ AccumuloInputFormat.setInputTableName(job, tableName);
+ AccumuloInputFormat.setScanAuthorizations(job, authorizations);
+ if (mock) {
+ AccumuloInputFormat.setMockInstance(job, instance);
+ } else {
+ ClientConfiguration clientConfig = ClientConfiguration.loadDefault()
+ .withInstance(instance).withZkHosts(zk);
+ AccumuloInputFormat.setZooKeeperInstance(job, clientConfig);
+ }
+ } catch (IOException | AccumuloSecurityException e) {
+ // TODO better exception handling here
+ e.printStackTrace();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaVertexInputFormat.java b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaVertexInputFormat.java
new file mode 100644
index 0000000..ecd2c13
--- /dev/null
+++ b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaVertexInputFormat.java
@@ -0,0 +1,75 @@
+package org.apache.rya.giraph.format;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.VertexReader;
+import org.apache.giraph.io.accumulo.AccumuloVertexInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.accumulo.mr.MRUtils;
+import org.apache.rya.accumulo.mr.RyaInputFormat;
+import org.apache.rya.accumulo.mr.RyaInputFormat.RyaStatementRecordReader;
+import org.apache.rya.accumulo.mr.RyaStatementWritable;
+import org.apache.rya.accumulo.mr.RyaTypeWritable;
+import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import org.apache.rya.api.resolver.RyaTripleContext;
+
+public class RyaVertexInputFormat extends AccumuloVertexInputFormat<Text, RyaTypeWritable, RyaStatementWritable> {
+
+
+ private RyaInputFormat ryaInputFormat = new RyaInputFormat();
+ private TABLE_LAYOUT rdfTableLayout;
+ private RyaTripleContext tripleContext;
+
+ @Override
+ public VertexReader<Text, RyaTypeWritable, RyaStatementWritable> createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
+ return new RyaVertexReader((RyaStatementRecordReader) ryaInputFormat.createRecordReader(split, context),
+ rdfTableLayout, tripleContext,
+ context.getConfiguration());
+ }
+
+ @Override
+ public void checkInputSpecs(Configuration conf) {
+ // don't need to do anything here
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context, int minSplitCountHint) throws IOException, InterruptedException {
+ return ryaInputFormat.getSplits(context);
+ }
+
+ @Override
+ public void setConf(ImmutableClassesGiraphConfiguration<Text, RyaTypeWritable, RyaStatementWritable> conf) {
+ super.setConf(conf);
+ RyaGiraphUtils.initializeAccumuloInputFormat(conf);
+ rdfTableLayout = MRUtils.getTableLayout(conf, TABLE_LAYOUT.SPO);
+ tripleContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(conf));
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaVertexReader.java
----------------------------------------------------------------------
diff --git a/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaVertexReader.java b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaVertexReader.java
new file mode 100644
index 0000000..dbb404d
--- /dev/null
+++ b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaVertexReader.java
@@ -0,0 +1,94 @@
+package org.apache.rya.giraph.format;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.rya.accumulo.mr.RyaInputFormat.RyaStatementRecordReader;
+import org.apache.rya.accumulo.mr.RyaStatementWritable;
+import org.apache.rya.accumulo.mr.RyaTypeWritable;
+import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.resolver.RyaTripleContext;
+
+public class RyaVertexReader extends VertexReader<Text, RyaTypeWritable, RyaStatementWritable>{
+
+ private RyaStatementRecordReader reader;
+ private RyaTripleContext tripleContext;
+ private TABLE_LAYOUT tableLayout;
+ private ImmutableClassesGiraphConfiguration<Text, RyaTypeWritable, RyaStatementWritable> classesConfiguration;
+
+ public RyaVertexReader(RyaStatementRecordReader recordReader,
+ TABLE_LAYOUT rdfTableLayout, RyaTripleContext tripleContext, Configuration conf){
+ this.reader = recordReader;
+ this.tableLayout = rdfTableLayout;
+ this.tripleContext = tripleContext;
+ this.classesConfiguration =
+ new ImmutableClassesGiraphConfiguration<Text, RyaTypeWritable, RyaStatementWritable>(conf);
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
+ reader.initialize(inputSplit, context, tripleContext, tableLayout);
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return reader.nextKeyValue();
+ }
+
+ @Override
+ public Vertex<Text, RyaTypeWritable, RyaStatementWritable> getCurrentVertex() throws IOException, InterruptedException {
+ RyaStatementWritable currentStatement = reader.getCurrentValue();
+ RyaStatement ryaStatement = currentStatement.getRyaStatement();
+ RyaTypeWritable vertexWritable = new RyaTypeWritable();
+ vertexWritable.setRyaType(ryaStatement.getSubject());
+ Text vertexId = new Text(ryaStatement.getSubject().getData());
+ Vertex<Text, RyaTypeWritable, RyaStatementWritable> vertex = classesConfiguration.createVertex();
+ Edge<Text, RyaStatementWritable> edge = EdgeFactory.create(new Text(ryaStatement.toString()),
+ currentStatement);
+ List<Edge<Text, RyaStatementWritable>> edges = new ArrayList<Edge<Text, RyaStatementWritable>>();
+ edges.add(edge);
+ vertex.initialize(vertexId, vertexWritable, edges);
+ return vertex;
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return reader.getProgress();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/rya.giraph/src/test/java/org/apache/rya/giraph/format/TestTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/extras/rya.giraph/src/test/java/org/apache/rya/giraph/format/TestTextOutputFormat.java b/extras/rya.giraph/src/test/java/org/apache/rya/giraph/format/TestTextOutputFormat.java
new file mode 100644
index 0000000..be297db
--- /dev/null
+++ b/extras/rya.giraph/src/test/java/org/apache/rya/giraph/format/TestTextOutputFormat.java
@@ -0,0 +1,44 @@
+package org.apache.rya.giraph.format;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.formats.TextVertexOutputFormat;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class TestTextOutputFormat extends TextVertexOutputFormat<Text, Text, Text> {
+
+ public class SystemOutVertexWriter extends TextVertexWriter {
+
+ @Override
+ public void writeVertex(Vertex<Text, Text, Text> vertex) throws IOException, InterruptedException {
+ System.out.println(vertex);
+ }
+
+ }
+
+ @Override
+ public TextVertexOutputFormat<Text, Text, Text>.TextVertexWriter createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new SystemOutVertexWriter();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/rya.giraph/src/test/java/org/apache/rya/giraph/format/TestVertexFormat.java
----------------------------------------------------------------------
diff --git a/extras/rya.giraph/src/test/java/org/apache/rya/giraph/format/TestVertexFormat.java b/extras/rya.giraph/src/test/java/org/apache/rya/giraph/format/TestVertexFormat.java
new file mode 100644
index 0000000..920e876
--- /dev/null
+++ b/extras/rya.giraph/src/test/java/org/apache/rya/giraph/format/TestVertexFormat.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rya.giraph.format;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.giraph.BspCase;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.accumulo.AccumuloVertexInputFormat;
+import org.apache.giraph.job.GiraphJob;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.accumulo.AccumuloRyaDAO;
+import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.giraph.format.RyaVertexInputFormat;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.junit.Test;
+
+/*
+ Test class for Rya vertex input formats.
+ */
+public class TestVertexFormat extends BspCase {
+
+ private final Logger log = Logger.getLogger(TestVertexFormat.class);
+
+ /**
+ * Create the test case
+ */
+ public TestVertexFormat() {
+ super(TestVertexFormat.class.getName());
+ System.setProperty("java.io.tmpdir", "target/test");
+}
+
+ private static AccumuloRdfConfiguration getConf() {
+
+ final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+
+ conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, true);
+ conf.set(ConfigUtils.USE_PCJ, "false");
+ conf.set(ConfigUtils.USE_FREETEXT, "false");
+ conf.set(ConfigUtils.USE_TEMPORAL, "false");
+ conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, "rya_");
+ conf.set(ConfigUtils.CLOUDBASE_USER, "root");
+ conf.set(ConfigUtils.CLOUDBASE_PASSWORD, "");
+ conf.set(ConfigUtils.CLOUDBASE_INSTANCE, "test");
+ conf.set(ConfigUtils.CLOUDBASE_AUTHS, "");
+ return conf;
+ }
+
+ /*
+ Write a simple parent-child directed graph to Cloudbase.
+ Run a job which reads the values
+ into subclasses that extend CloudbaseVertex I/O formats.
+ Check the output after the job.
+ */
+ @Test
+ public void testRyaInput() throws Exception {
+
+ AccumuloRdfConfiguration conf = getConf();
+ AccumuloRyaDAO ryaDAO = RyaSailFactory.getAccumuloDAO(conf);
+
+ ryaDAO.add(new RyaStatement(new RyaURI("urn:test#1234"),
+ new RyaURI("urn:test#pred1"),
+ new RyaURI("urn:test#obj1")));
+ ryaDAO.add(new RyaStatement(new RyaURI("urn:test#1234"),
+ new RyaURI("urn:test#pred2"),
+ new RyaURI("urn:test#obj2")));
+ ryaDAO.add(new RyaStatement(new RyaURI("urn:test#1234"),
+ new RyaURI("urn:test#pred3"),
+ new RyaURI("urn:test#obj3")));
+ ryaDAO.add(new RyaStatement(new RyaURI("urn:test#1234"),
+ new RyaURI("urn:test#pred4"),
+ new RyaURI("urn:test#obj4")));
+ ryaDAO.flush();
+
+ GiraphJob job = new GiraphJob(conf, getCallingMethodName());
+
+ setupConfiguration(job);
+ GiraphConfiguration giraphConf = job.getConfiguration();
+ giraphConf.setComputationClass(EdgeNotification.class);
+ giraphConf.setVertexInputFormatClass(RyaVertexInputFormat.class);
+ giraphConf.setVertexOutputFormatClass(TestTextOutputFormat.class);
+
+
+ if (log.isInfoEnabled())
+ log.info("Running edge notification job using Rya Vertex input");
+
+ }
+
+ /*
+ Test compute method that sends each edge a notification of its parents.
+ The test set only has a 1-1 parent-to-child ratio for this unit test.
+ */
+ public static class EdgeNotification
+ extends BasicComputation<Text, Text, Text, Text> {
+ @Override
+ public void compute(Vertex<Text, Text, Text> vertex,
+ Iterable<Text> messages) throws IOException {
+ for (Text message : messages) {
+ vertex.getValue().set(message);
+ }
+ if(getSuperstep() == 0) {
+ sendMessageToAllEdges(vertex, vertex.getId());
+ }
+ vertex.voteToHalt();
+ }
+}
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaInputFormat.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaInputFormat.java b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaInputFormat.java
index baa7033..2fc2728 100644
--- a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaInputFormat.java
+++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaInputFormat.java
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.client.mapreduce.AbstractInputFormat;
import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@@ -95,6 +96,19 @@ public class RyaInputFormat extends AbstractInputFormat<Text, RyaStatementWritab
}
/**
+ * Initializes the RecordReader.
+ * @param inSplit Defines the portion of data to read.
+ * @param attempt Context for this task attempt.
+ * @throws IOException if thrown by the superclass's initialize method.
+ */
+ public void initialize(InputSplit inSplit, TaskAttemptContext attempt, RyaTripleContext context, TABLE_LAYOUT tableLayout) throws IOException {
+ super.initialize(inSplit, attempt);
+ this.tableLayout = tableLayout;
+ //TODO verify that this is correct
+ this.ryaContext = context;
+ }
+
+ /**
* Load the next statement by converting the next Accumulo row to a
* statement, and make the new (key,value) pair available for retrieval.
* @return true if another (key,value) pair was fetched and is ready to