You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2016/12/20 21:37:51 UTC
[2/3] incubator-rya git commit: Bugfixes and updates for Spark support
Bugfixes and updates for Spark support
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/3f27536a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/3f27536a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/3f27536a
Branch: refs/heads/master
Commit: 3f27536a38af4983f697ecd0540e9a82185b3262
Parents: 77ff31e
Author: Jesse Hatfield <je...@parsons.com>
Authored: Thu Sep 8 16:40:55 2016 -0400
Committer: pujav65 <pu...@gmail.com>
Committed: Tue Dec 20 10:47:20 2016 -0500
----------------------------------------------------------------------
.../rya/indexing/accumulo/ConfigUtils.java | 1 -
.../accumulo/entity/EntityCentricIndex.java | 23 +-
mapreduce/pom.xml | 20 +-
.../rya/accumulo/mr/GraphXEdgeInputFormat.java | 209 ------------------
.../mvm/rya/accumulo/mr/GraphXInputFormat.java | 132 ------------
.../mvm/rya/accumulo/mr/RyaTypeWritable.java | 74 -------
.../rya/accumulo/mr/GraphXEdgeInputFormat.java | 216 +++++++++++++++++++
.../rya/accumulo/mr/GraphXInputFormat.java | 147 +++++++++++++
.../apache/rya/accumulo/mr/RyaTypeWritable.java | 123 +++++++++++
.../accumulo/mr/GraphXEdgeInputFormatTest.java | 134 ------------
.../rya/accumulo/mr/GraphXInputFormatTest.java | 144 -------------
.../accumulo/mr/GraphXEdgeInputFormatTest.java | 134 ++++++++++++
.../rya/accumulo/mr/GraphXInputFormatTest.java | 142 ++++++++++++
pom.xml | 1 +
spark/pom.xml | 6 +-
.../accumulo/spark/GraphXGraphGenerator.java | 188 ----------------
.../accumulo/spark/GraphXGraphGenerator.java | 183 ++++++++++++++++
17 files changed, 978 insertions(+), 899 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
index e9e6c31..61a1003 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
@@ -365,7 +365,6 @@ public class ConfigUtils {
public static void setIndexers(final RdfCloudTripleStoreConfiguration conf) {
- System.out.println("Testuing");
final List<String> indexList = Lists.newArrayList();
final List<String> optimizers = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java
index d58b1f1..0676e3d 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java
@@ -24,6 +24,7 @@ import static org.apache.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE;
import static org.apache.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTES;
import static org.apache.rya.api.RdfCloudTripleStoreConstants.EMPTY_BYTES;
import static org.apache.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT;
+import static org.apache.rya.api.RdfCloudTripleStoreConstants.TYPE_DELIM_BYTES;
import java.io.IOException;
import java.util.Arrays;
@@ -281,24 +282,26 @@ public class EntityCentricIndex extends AbstractAccumuloIndexer {
final byte[] columnFamily = Arrays.copyOf(data, split);
final byte[] edgeBytes = Arrays.copyOfRange(data, split + DELIM_BYTES.length, data.length);
split = Bytes.indexOf(edgeBytes, DELIM_BYTES);
- final String otherNodeVar = new String(Arrays.copyOf(edgeBytes, split));
- final byte[] otherNodeBytes = Arrays.copyOfRange(edgeBytes, split + DELIM_BYTES.length, edgeBytes.length - 2);
- final byte[] typeBytes = Arrays.copyOfRange(edgeBytes, edgeBytes.length - 2, edgeBytes.length);
+ String otherNodeVar = new String(Arrays.copyOf(edgeBytes, split));
+ byte[] otherNodeBytes = Arrays.copyOfRange(edgeBytes, split + DELIM_BYTES.length, edgeBytes.length);
+ split = Bytes.indexOf(otherNodeBytes, TYPE_DELIM_BYTES);
+ byte[] otherNodeData = Arrays.copyOf(otherNodeBytes, split);
+ byte[] typeBytes = Arrays.copyOfRange(otherNodeBytes, split, otherNodeBytes.length);
byte[] objectBytes;
RyaURI subject;
final RyaURI predicate = new RyaURI(new String(predicateBytes));
RyaType object;
RyaURI context = null;
- // Expect either: entity=subject.data, otherNodeVar="object", otherNodeBytes={object.data, object.datatype_marker}
- // or: entity=object.data, otherNodeVar="subject", otherNodeBytes={subject.data, object.datatype_marker}
+ // Expect either: entity=subject.data, otherNodeVar="object", otherNodeBytes={object.data, object.datatype}
+ // or: entity=object.data, otherNodeVar="subject", otherNodeBytes={subject.data, object.datatype}
switch (otherNodeVar) {
case "subject":
- subject = new RyaURI(new String(otherNodeBytes));
+ subject = new RyaURI(new String(otherNodeData));
objectBytes = Bytes.concat(entityBytes, typeBytes);
break;
case "object":
subject = new RyaURI(new String(entityBytes));
- objectBytes = Bytes.concat(otherNodeBytes, typeBytes);
+ objectBytes = Bytes.concat(otherNodeData, typeBytes);
break;
default:
throw new IOException("Failed to deserialize entity-centric index row. "
@@ -311,7 +314,7 @@ public class EntityCentricIndex extends AbstractAccumuloIndexer {
return new RyaStatement(subject, predicate, object, context,
null, columnVisibility, valueBytes, timestamp);
}
-
+
/**
* Return the RyaType of the Entity Centric Index row.
* @param key Row key, contains statement data
@@ -332,7 +335,9 @@ public class EntityCentricIndex extends AbstractAccumuloIndexer {
byte[] edgeBytes = Arrays.copyOfRange(data, split + DELIM_BYTES.length, data.length);
split = Bytes.indexOf(edgeBytes, DELIM_BYTES);
String otherNodeVar = new String(Arrays.copyOf(edgeBytes, split));
- byte[] typeBytes = Arrays.copyOfRange(edgeBytes, edgeBytes.length - 2, edgeBytes.length);
+ byte[] otherNodeBytes = Arrays.copyOfRange(edgeBytes, split + DELIM_BYTES.length, edgeBytes.length);
+ split = Bytes.indexOf(otherNodeBytes, TYPE_DELIM_BYTES);
+ byte[] typeBytes = Arrays.copyOfRange(otherNodeBytes, split, otherNodeBytes.length);
byte[] objectBytes;
RyaURI subject;
RyaType object;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/pom.xml
----------------------------------------------------------------------
diff --git a/mapreduce/pom.xml b/mapreduce/pom.xml
index d222845..c4f94f6 100644
--- a/mapreduce/pom.xml
+++ b/mapreduce/pom.xml
@@ -31,11 +31,11 @@ under the License.
<name>Apache Rya MapReduce Tools</name>
<dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-graphx_2.11</artifactId>
- <version>1.6.2</version>
- </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-graphx_2.11</artifactId>
+ <version>1.6.2</version>
+ </dependency>
<dependency>
<groupId>org.apache.rya</groupId>
<artifactId>rya.api</artifactId>
@@ -116,6 +116,16 @@ under the License.
<executions>
<execution>
<configuration>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
</transformers>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormat.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormat.java
deleted file mode 100644
index 79d6e82..0000000
--- a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormat.java
+++ /dev/null
@@ -1,209 +0,0 @@
-package mvm.rya.accumulo.mr;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.List;
-import java.util.Map.Entry;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.resolver.RyaTripleContext;
-import mvm.rya.api.resolver.triple.TripleRow;
-import mvm.rya.api.resolver.triple.TripleRowResolverException;
-
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.mapreduce.AbstractInputFormat;
-import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
-import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.spark.graphx.Edge;
-
-/**
- * Subclass of {@link AbstractInputFormat} for reading
- * {@link RyaStatementWritable}s directly from a running Rya instance.
- */
-@SuppressWarnings("rawtypes")
-public class GraphXEdgeInputFormat extends InputFormatBase<Object, Edge> {
- /**
- * Instantiates a RecordReader for this InputFormat and a given task and
- * input split.
- *
- * @param split
- * Defines the portion of the input this RecordReader is
- * responsible for.
- * @param context
- * The context of the task.
- * @return A RecordReader that can be used to fetch RyaStatementWritables.
- */
- @Override
- public RecordReader<Object, Edge> createRecordReader(InputSplit split,
- TaskAttemptContext context) {
- return new RyaStatementRecordReader();
- }
-
- /**
- * Sets the table layout to use.
- *
- * @param conf
- * Configuration to set the layout in.
- * @param layout
- * Statements will be read from the Rya table associated with
- * this layout.
- */
- public static void setTableLayout(Job conf, TABLE_LAYOUT layout) {
- conf.getConfiguration().set(MRUtils.TABLE_LAYOUT_PROP, layout.name());
- }
-
- /**
- * Retrieves RyaStatementWritable objects from Accumulo tables.
- */
- public class RyaStatementRecordReader extends
- AbstractRecordReader<Object, Edge> {
- private RyaTripleContext ryaContext;
- private TABLE_LAYOUT tableLayout;
-
- protected void setupIterators(TaskAttemptContext context,
- Scanner scanner, String tableName, RangeInputSplit split) {
- }
-
- /**
- * Initializes the RecordReader.
- *
- * @param inSplit
- * Defines the portion of data to read.
- * @param attempt
- * Context for this task attempt.
- * @throws IOException
- * if thrown by the superclass's initialize method.
- */
- @Override
- public void initialize(InputSplit inSplit, TaskAttemptContext attempt)
- throws IOException {
- super.initialize(inSplit, attempt);
- this.tableLayout = MRUtils.getTableLayout(
- attempt.getConfiguration(), TABLE_LAYOUT.SPO);
- // TODO verify that this is correct
- this.ryaContext = RyaTripleContext
- .getInstance(new AccumuloRdfConfiguration(attempt
- .getConfiguration()));
- }
-
- /**
- * Load the next statement by converting the next Accumulo row to a
- * statement, and make the new (key,value) pair available for retrieval.
- *
- * @return true if another (key,value) pair was fetched and is ready to
- * be retrieved, false if there was none.
- * @throws IOException
- * if a row was loaded but could not be converted to a
- * statement.
- */
- @Override
- public boolean nextKeyValue() throws IOException {
- if (!scannerIterator.hasNext())
- return false;
- Entry<Key, Value> entry = scannerIterator.next();
- ++numKeysRead;
- currentKey = entry.getKey();
- try {
- currentK = currentKey.getRow();
- RyaTypeWritable rtw = null;
- RyaStatement stmt = this.ryaContext.deserializeTriple(
- this.tableLayout, new TripleRow(entry.getKey().getRow()
- .getBytes(), entry.getKey().getColumnFamily()
- .getBytes(), entry.getKey()
- .getColumnQualifier().getBytes(), entry
- .getKey().getTimestamp(), entry.getKey()
- .getColumnVisibility().getBytes(), entry
- .getValue().get()));
-
- String subjURI = stmt.getSubject().getDataType().toString();
- String objURI = stmt.getObject().getDataType().toString();
-
- // SHA-256 the string value and then generate a hashcode from
- // the digested string, the collision ratio is less than 0.0001%
- // using custom hash function should significantly reduce the
- // collision ratio
- MessageDigest messageDigest = MessageDigest
- .getInstance("SHA-256");
-
- messageDigest.update(subjURI.getBytes());
- String encryptedString = new String(messageDigest.digest());
- long subHash = hash(encryptedString);
-
- messageDigest.update(objURI.getBytes());
- encryptedString = new String(messageDigest.digest());
- long objHash = hash(encryptedString);
-
- Edge<RyaTypeWritable> writable = new Edge<RyaTypeWritable>(
- subHash, objHash, rtw);
- currentV = writable;
- } catch (TripleRowResolverException | NoSuchAlgorithmException e) {
- throw new IOException(e);
- }
- return true;
- }
-
- protected List<IteratorSetting> contextIterators(
- TaskAttemptContext context, String tableName) {
- return getIterators(context);
- }
-
- @Override
- protected void setupIterators(TaskAttemptContext context,
- Scanner scanner, String tableName,
- org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) {
- List<IteratorSetting> iterators = null;
-
- if (null == split) {
- iterators = contextIterators(context, tableName);
- } else {
- iterators = split.getIterators();
- if (null == iterators) {
- iterators = contextIterators(context, tableName);
- }
- }
-
- for (IteratorSetting iterator : iterators)
- scanner.addScanIterator(iterator);
- }
-
- }
-
- public static long hash(String string) {
- long h = 1125899906842597L; // prime
- int len = string.length();
-
- for (int i = 0; i < len; i++) {
- h = 31 * h + string.charAt(i);
- }
- return h;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXInputFormat.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXInputFormat.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXInputFormat.java
deleted file mode 100644
index 6ec5c74..0000000
--- a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXInputFormat.java
+++ /dev/null
@@ -1,132 +0,0 @@
-package mvm.rya.accumulo.mr;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.SortedMap;
-
-import mvm.rya.api.domain.RyaType;
-import mvm.rya.api.resolver.RyaTypeResolverException;
-import mvm.rya.indexing.accumulo.entity.EntityCentricIndex;
-
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.user.WholeRowIterator;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-public class GraphXInputFormat extends InputFormatBase<Object, RyaTypeWritable> {
-
- private static final int WHOLE_ROW_ITERATOR_PRIORITY = 23;
-
- /**
- * Instantiates a RecordReader for this InputFormat and a given task and
- * input split.
- *
- * @param split
- * Defines the portion of the input this RecordReader is
- * responsible for.
- * @param context
- * The context of the task.
- * @return A RecordReader that can be used to fetch RyaStatementWritables.
- */
- @Override
- public RecordReader<Object, RyaTypeWritable> createRecordReader(
- InputSplit split, TaskAttemptContext context) {
- return new RyaStatementRecordReader();
- }
-
-
-
- /**
- * Retrieves RyaStatementWritable objects from Accumulo tables.
- */
- public class RyaStatementRecordReader extends
- AbstractRecordReader<Object, RyaTypeWritable> {
- protected void setupIterators(TaskAttemptContext context,
- Scanner scanner, String tableName,
- @SuppressWarnings("deprecation") RangeInputSplit split) {
- IteratorSetting iteratorSetting = new IteratorSetting(
- WHOLE_ROW_ITERATOR_PRIORITY, WholeRowIterator.class);
- scanner.addScanIterator(iteratorSetting);
- }
-
- /**
- * Initializes the RecordReader.
- *
- * @param inSplit
- * Defines the portion of data to read.
- * @param attempt
- * Context for this task attempt.
- * @throws IOException
- * if thrown by the superclass's initialize method.
- */
- @Override
- public void initialize(InputSplit inSplit, TaskAttemptContext attempt)
- throws IOException {
- super.initialize(inSplit, attempt);
- }
-
- /**
- * Load the next statement by converting the next Accumulo row to a
- * statement, and make the new (key,value) pair available for retrieval.
- *
- * @return true if another (key,value) pair was fetched and is ready to
- * be retrieved, false if there was none.
- * @throws IOException
- * if a row was loaded but could not be converted to a
- * statement.
- */
- @Override
- public boolean nextKeyValue() throws IOException {
- if (!scannerIterator.hasNext())
- return false;
- Entry<Key, Value> entry = scannerIterator.next();
- ++numKeysRead;
- currentKey = entry.getKey();
-
- try {
- currentK = currentKey.getRow();
- SortedMap<Key, Value> wholeRow = WholeRowIterator.decodeRow(entry.getKey(), entry.getValue());
- Key key = wholeRow.firstKey();
- Value value = wholeRow.get(key);
- RyaType type = EntityCentricIndex.getRyaType(key, value);
- RyaTypeWritable writable = new RyaTypeWritable();
- writable.setRyaType(type);
- currentV = writable;
- } catch (RyaTypeResolverException e) {
- throw new IOException();
- }
- return true;
- }
-
- protected List<IteratorSetting> contextIterators(
- TaskAttemptContext context, String tableName) {
- return getIterators(context);
- }
-
- @Override
- protected void setupIterators(TaskAttemptContext context,
- Scanner scanner, String tableName,
- org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) {
-
- List<IteratorSetting> iterators = null;
-
- if (null == split) {
- iterators = contextIterators(context, tableName);
- } else {
- iterators = split.getIterators();
- if (null == iterators) {
- iterators = contextIterators(context, tableName);
- }
- }
-
- for (IteratorSetting iterator : iterators)
- scanner.addScanIterator(iterator);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaTypeWritable.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaTypeWritable.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaTypeWritable.java
deleted file mode 100644
index ddc0948..0000000
--- a/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaTypeWritable.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package mvm.rya.accumulo.mr;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaType;
-import mvm.rya.api.resolver.triple.TripleRow;
-import mvm.rya.api.resolver.triple.TripleRowResolverException;
-
-import org.apache.hadoop.io.WritableComparable;
-import org.openrdf.model.URI;
-import org.openrdf.model.impl.ValueFactoryImpl;
-
-public class RyaTypeWritable implements WritableComparable<RyaTypeWritable>{
-
- private RyaType ryatype;
-
- /**
- * Read part of a statement from an input stream.
- * @param dataInput Stream for reading serialized statements.
- * @return The next individual field, as a byte array.
- * @throws IOException if reading from the stream fails.
- */
- protected byte[] read(DataInput dataInput) throws IOException {
- if (dataInput.readBoolean()) {
- int len = dataInput.readInt();
- byte[] bytes = new byte[len];
- dataInput.readFully(bytes);
- return bytes;
- }else {
- return null;
- }
- }
-
- @Override
- public void readFields(DataInput dataInput) throws IOException {
- ValueFactoryImpl vfi = new ValueFactoryImpl();
- String data = dataInput.readLine();
- String dataTypeString = dataInput.readLine();
- URI dataType = vfi.createURI(dataTypeString);
- ryatype.setData(data);
- ryatype.setDataType(dataType);
- }
-
- @Override
- public void write(DataOutput dataOutput) throws IOException {
- dataOutput.writeChars(ryatype.getData());
- dataOutput.writeChars(ryatype.getDataType().toString());
- }
-
- /**
- * Gets the contained RyaStatement.
- * @return The statement represented by this RyaStatementWritable.
- */
- public RyaType getRyaType() {
- return ryatype;
- }
- /**
- * Sets the contained RyaStatement.
- * @param ryaStatement The statement to be represented by this
- * RyaStatementWritable.
- */
- public void setRyaType(RyaType ryatype) {
- this.ryatype = ryatype;
- }
-
- @Override
- public int compareTo(RyaTypeWritable o) {
- return ryatype.compareTo(o.ryatype);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormat.java b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormat.java
new file mode 100644
index 0000000..489fd34
--- /dev/null
+++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormat.java
@@ -0,0 +1,216 @@
+package org.apache.rya.accumulo.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.resolver.RyaTripleContext;
+import org.apache.rya.api.resolver.triple.TripleRow;
+import org.apache.rya.api.resolver.triple.TripleRowResolverException;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mapreduce.AbstractInputFormat;
+import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.spark.graphx.Edge;
+
+/**
+ * Subclass of {@link AbstractInputFormat} for reading
+ * {@link RyaStatementWritable}s directly from a running Rya instance.
+ */
+@SuppressWarnings("rawtypes")
+public class GraphXEdgeInputFormat extends InputFormatBase<Object, Edge> {
+ /**
+ * Instantiates a RecordReader for this InputFormat and a given task and
+ * input split.
+ *
+ * @param split
+ * Defines the portion of the input this RecordReader is
+ * responsible for.
+ * @param context
+ * The context of the task.
+ * @return A RecordReader that can be used to fetch RyaStatementWritables.
+ */
+ @Override
+ public RecordReader<Object, Edge> createRecordReader(InputSplit split,
+ TaskAttemptContext context) {
+ return new RyaStatementRecordReader();
+ }
+
+ /**
+ * Sets the table layout to use.
+ *
+ * @param conf
+ * Configuration to set the layout in.
+ * @param layout
+ * Statements will be read from the Rya table associated with
+ * this layout.
+ */
+ public static void setTableLayout(Job conf, TABLE_LAYOUT layout) {
+ conf.getConfiguration().set(MRUtils.TABLE_LAYOUT_PROP, layout.name());
+ }
+
+ /**
+ * Retrieves RyaStatementWritable objects from Accumulo tables.
+ */
+ public class RyaStatementRecordReader extends
+ AbstractRecordReader<Object, Edge> {
+ private RyaTripleContext ryaContext;
+ private TABLE_LAYOUT tableLayout;
+
+ protected void setupIterators(TaskAttemptContext context,
+ Scanner scanner, String tableName, RangeInputSplit split) {
+ }
+
+ /**
+ * Initializes the RecordReader.
+ *
+ * @param inSplit
+ * Defines the portion of data to read.
+ * @param attempt
+ * Context for this task attempt.
+ * @throws IOException
+ * if thrown by the superclass's initialize method.
+ */
+ @Override
+ public void initialize(InputSplit inSplit, TaskAttemptContext attempt)
+ throws IOException {
+ super.initialize(inSplit, attempt);
+ this.tableLayout = MRUtils.getTableLayout(
+ attempt.getConfiguration(), TABLE_LAYOUT.SPO);
+ // TODO verify that this is correct
+ this.ryaContext = RyaTripleContext
+ .getInstance(new AccumuloRdfConfiguration(attempt
+ .getConfiguration()));
+ }
+
+ /**
+ * Load the next statement by converting the next Accumulo row to a
+ * statement, and make the new (key,value) pair available for retrieval.
+ *
+ * @return true if another (key,value) pair was fetched and is ready to
+ * be retrieved, false if there was none.
+ * @throws IOException
+ * if a row was loaded but could not be converted to a
+ * statement.
+ */
+ @Override
+ public boolean nextKeyValue() throws IOException {
+ if (!scannerIterator.hasNext())
+ return false;
+ Entry<Key, Value> entry = scannerIterator.next();
+ ++numKeysRead;
+ currentKey = entry.getKey();
+ try {
+ currentK = currentKey.getRow();
+ RyaTypeWritable rtw = new RyaTypeWritable();
+ RyaStatement stmt = this.ryaContext.deserializeTriple(
+ this.tableLayout, new TripleRow(entry.getKey().getRow()
+ .getBytes(), entry.getKey().getColumnFamily()
+ .getBytes(), entry.getKey()
+ .getColumnQualifier().getBytes(), entry
+ .getKey().getTimestamp(), entry.getKey()
+ .getColumnVisibility().getBytes(), entry
+ .getValue().get()));
+
+ long subHash = getVertexId(stmt.getSubject());
+ long objHash = getVertexId(stmt.getObject());
+ rtw.setRyaType(stmt.getPredicate());
+
+ Edge<RyaTypeWritable> writable = new Edge<RyaTypeWritable>(
+ subHash, objHash, rtw);
+ currentV = writable;
+ } catch (TripleRowResolverException e) {
+ throw new IOException(e);
+ }
+ return true;
+ }
+
+ protected List<IteratorSetting> contextIterators(
+ TaskAttemptContext context, String tableName) {
+ return getIterators(context);
+ }
+
+ @Override
+ protected void setupIterators(TaskAttemptContext context,
+ Scanner scanner, String tableName,
+ org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) {
+ List<IteratorSetting> iterators = null;
+
+ if (null == split) {
+ iterators = contextIterators(context, tableName);
+ } else {
+ iterators = split.getIterators();
+ if (null == iterators) {
+ iterators = contextIterators(context, tableName);
+ }
+ }
+
+ for (IteratorSetting iterator : iterators)
+ scanner.addScanIterator(iterator);
+ }
+
+ }
+
+ public static long getVertexId(RyaType resource) throws IOException {
+ String uri = "";
+ if (resource != null) {
+ uri = resource.getData().toString();
+ }
+ try {
+ // SHA-256 the string value and then generate a hashcode from
+ // the digested string, the collision ratio is less than 0.0001%
+ // using custom hash function should significantly reduce the
+ // collision ratio
+ MessageDigest messageDigest = MessageDigest
+ .getInstance("SHA-256");
+ messageDigest.update(uri.getBytes());
+ String encryptedString = new String(messageDigest.digest());
+ return hash(encryptedString);
+ }
+ catch (NoSuchAlgorithmException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public static long hash(String string) {
+ long h = 1125899906842597L; // prime
+ int len = string.length();
+
+ for (int i = 0; i < len; i++) {
+ h = 31 * h + string.charAt(i);
+ }
+ return h;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXInputFormat.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXInputFormat.java b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXInputFormat.java
new file mode 100644
index 0000000..77f4e63
--- /dev/null
+++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXInputFormat.java
@@ -0,0 +1,147 @@
+package org.apache.rya.accumulo.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.resolver.RyaTypeResolverException;
+import org.apache.rya.indexing.accumulo.entity.EntityCentricIndex;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class GraphXInputFormat extends InputFormatBase<Object, RyaTypeWritable> {
+
+ private static final int WHOLE_ROW_ITERATOR_PRIORITY = 23;
+
+ /**
+ * Instantiates a RecordReader for this InputFormat and a given task and
+ * input split.
+ *
+ * @param split
+ * Defines the portion of the input this RecordReader is
+ * responsible for.
+ * @param context
+ * The context of the task.
+ * @return A RecordReader that can be used to fetch RyaStatementWritables.
+ */
+ @Override
+ public RecordReader<Object, RyaTypeWritable> createRecordReader(
+ InputSplit split, TaskAttemptContext context) {
+ return new RyaStatementRecordReader();
+ }
+
+
+
+ /**
+ * Retrieves RyaStatementWritable objects from Accumulo tables.
+ */
+ public class RyaStatementRecordReader extends
+ AbstractRecordReader<Object, RyaTypeWritable> {
+ protected void setupIterators(TaskAttemptContext context,
+ Scanner scanner, String tableName,
+ @SuppressWarnings("deprecation") RangeInputSplit split) {
+ IteratorSetting iteratorSetting = new IteratorSetting(
+ WHOLE_ROW_ITERATOR_PRIORITY, WholeRowIterator.class);
+ scanner.addScanIterator(iteratorSetting);
+ }
+
+ /**
+ * Initializes the RecordReader.
+ *
+ * @param inSplit
+ * Defines the portion of data to read.
+ * @param attempt
+ * Context for this task attempt.
+ * @throws IOException
+ * if thrown by the superclass's initialize method.
+ */
+ @Override
+ public void initialize(InputSplit inSplit, TaskAttemptContext attempt)
+ throws IOException {
+ super.initialize(inSplit, attempt);
+ }
+
+ /**
+ * Load the next statement by converting the next Accumulo row to a
+ * statement, and make the new (key,value) pair available for retrieval.
+ *
+ * @return true if another (key,value) pair was fetched and is ready to
+ * be retrieved, false if there was none.
+ * @throws IOException
+ * if a row was loaded but could not be converted to a
+ * statement.
+ */
+ @Override
+ public boolean nextKeyValue() throws IOException {
+ if (!scannerIterator.hasNext())
+ return false;
+ Entry<Key, Value> entry = scannerIterator.next();
+ ++numKeysRead;
+ currentKey = entry.getKey();
+
+ try {
+ RyaType type = EntityCentricIndex.getRyaType(currentKey, entry.getValue());
+ RyaTypeWritable writable = new RyaTypeWritable();
+ writable.setRyaType(type);
+ currentK = GraphXEdgeInputFormat.getVertexId(type);
+ currentV = writable;
+ } catch (RyaTypeResolverException e) {
+ throw new IOException();
+ }
+ return true;
+ }
+
+ protected List<IteratorSetting> contextIterators(
+ TaskAttemptContext context, String tableName) {
+ return getIterators(context);
+ }
+
+ @Override
+ protected void setupIterators(TaskAttemptContext context,
+ Scanner scanner, String tableName,
+ org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) {
+
+ List<IteratorSetting> iterators = null;
+
+ if (null == split) {
+ iterators = contextIterators(context, tableName);
+ } else {
+ iterators = split.getIterators();
+ if (null == iterators) {
+ iterators = contextIterators(context, tableName);
+ }
+ }
+
+ for (IteratorSetting iterator : iterators)
+ scanner.addScanIterator(iterator);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaTypeWritable.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaTypeWritable.java b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaTypeWritable.java
new file mode 100644
index 0000000..ec47d82
--- /dev/null
+++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaTypeWritable.java
@@ -0,0 +1,123 @@
+package org.apache.rya.accumulo.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.rya.api.domain.RyaType;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.openrdf.model.URI;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+public class RyaTypeWritable implements WritableComparable<RyaTypeWritable>{
+
+ private RyaType ryatype;
+
+ /**
+ * Read part of a statement from an input stream.
+ * @param dataInput Stream for reading serialized statements.
+ * @return The next individual field, as a byte array.
+ * @throws IOException if reading from the stream fails.
+ */
+ protected byte[] read(DataInput dataInput) throws IOException {
+ if (dataInput.readBoolean()) {
+ int len = dataInput.readInt();
+ byte[] bytes = new byte[len];
+ dataInput.readFully(bytes);
+ return bytes;
+ }else {
+ return null;
+ }
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ ValueFactoryImpl vfi = new ValueFactoryImpl();
+ String data = dataInput.readLine();
+ String dataTypeString = dataInput.readLine();
+ URI dataType = vfi.createURI(dataTypeString);
+ ryatype.setData(data);
+ ryatype.setDataType(dataType);
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ dataOutput.writeChars(ryatype.getData());
+ dataOutput.writeChars(ryatype.getDataType().toString());
+ }
+
+ /**
+ * Gets the contained RyaStatement.
+ * @return The statement represented by this RyaStatementWritable.
+ */
+ public RyaType getRyaType() {
+ return ryatype;
+ }
+ /**
+ * Sets the contained RyaStatement.
+ * @param ryaStatement The statement to be represented by this
+ * RyaStatementWritable.
+ */
+ public void setRyaType(RyaType ryatype) {
+ this.ryatype = ryatype;
+ }
+
+ @Override
+ public int compareTo(RyaTypeWritable o) {
+ return ryatype.compareTo(o.ryatype);
+ }
+
+ /**
+ * Tests for equality using the equals method of the enclosed RyaType.
+ * @param o Object to compare with
+ * @return true if both objects are RyaTypeWritables containing equivalent
+ * RyaTypes.
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (o == null || !(o instanceof RyaTypeWritable)) {
+ return false;
+ }
+ RyaType rtThis = ryatype;
+ RyaType rtOther = ((RyaTypeWritable) o).ryatype;
+ if (rtThis == null) {
+ return rtOther == null;
+ }
+ else {
+ return rtThis.equals(rtOther);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ if (ryatype == null) {
+ return 0;
+ }
+ else {
+ return ryatype.hashCode();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormatTest.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormatTest.java b/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormatTest.java
deleted file mode 100644
index 445499d..0000000
--- a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormatTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-package mvm.rya.accumulo.mr;
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-import java.util.ArrayList;
-import java.util.List;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.AccumuloRyaDAO;
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaURI;
-
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.apache.spark.graphx.Edge;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class GraphXEdgeInputFormatTest {
-
- static String username = "root", table = "rya_spo";
- static PasswordToken password = new PasswordToken("");
-
- static Instance instance;
- static AccumuloRyaDAO apiImpl;
-
- @Before
- public void init() throws Exception {
- instance = new MockInstance(GraphXEdgeInputFormatTest.class.getName() + ".mock_instance");
- Connector connector = instance.getConnector(username, password);
- connector.tableOperations().create(table);
-
- AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
- conf.setTablePrefix("rya_");
- conf.setDisplayQueryPlan(false);
-
- apiImpl = new AccumuloRyaDAO();
- apiImpl.setConf(conf);
- apiImpl.setConnector(connector);
- apiImpl.init();
- }
-
- @After
- public void after() throws Exception {
- apiImpl.dropAndDestroy();
- }
-
- @SuppressWarnings("rawtypes")
- @Test
- public void testInputFormat() throws Exception {
- RyaStatement input = RyaStatement.builder()
- .setSubject(new RyaURI("http://www.google.com"))
- .setPredicate(new RyaURI("http://some_other_uri"))
- .setObject(new RyaURI("http://www.yahoo.com"))
- .setColumnVisibility(new byte[0])
- .setValue(new byte[0])
- .build();
-
- apiImpl.add(input);
-
- Job jobConf = Job.getInstance();
-
- GraphXEdgeInputFormat.setMockInstance(jobConf, instance.getInstanceName());
- GraphXEdgeInputFormat.setConnectorInfo(jobConf, username, password);
- GraphXEdgeInputFormat.setTableLayout(jobConf, TABLE_LAYOUT.SPO);
- GraphXEdgeInputFormat.setInputTableName(jobConf, table);
- GraphXEdgeInputFormat.setInputTableName(jobConf, table);
-
- GraphXEdgeInputFormat.setScanIsolation(jobConf, false);
- GraphXEdgeInputFormat.setLocalIterators(jobConf, false);
- GraphXEdgeInputFormat.setOfflineTableScan(jobConf, false);
-
- GraphXEdgeInputFormat inputFormat = new GraphXEdgeInputFormat();
-
- JobContext context = new JobContextImpl(jobConf.getConfiguration(), jobConf.getJobID());
-
- List<InputSplit> splits = inputFormat.getSplits(context);
-
- Assert.assertEquals(1, splits.size());
-
- TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new TaskID(), 1));
-
- RecordReader reader = inputFormat.createRecordReader(splits.get(0), taskAttemptContext);
-
- RecordReader ryaStatementRecordReader = (RecordReader) reader;
- ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext);
-
- List<Edge> results = new ArrayList<Edge>();
- while(ryaStatementRecordReader.nextKeyValue()) {
- Edge writable = (Edge) ryaStatementRecordReader.getCurrentValue();
- long srcId = writable.srcId();
- long destId = writable.dstId();
- RyaTypeWritable rtw = null;
- Object text = ryaStatementRecordReader.getCurrentKey();
- Edge<RyaTypeWritable> edge = new Edge<RyaTypeWritable>(srcId, destId, rtw);
- results.add(edge);
-
- System.out.println(text);
- }
-
- System.out.println(results.size());
- System.out.println(results);
- Assert.assertTrue(results.size() == 2);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXInputFormatTest.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXInputFormatTest.java b/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXInputFormatTest.java
deleted file mode 100644
index a31b27f..0000000
--- a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXInputFormatTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package mvm.rya.accumulo.mr;
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-import java.util.ArrayList;
-import java.util.List;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.AccumuloRyaDAO;
-import mvm.rya.accumulo.mr.GraphXInputFormat.RyaStatementRecordReader;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaType;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.indexing.accumulo.ConfigUtils;
-import mvm.rya.indexing.accumulo.entity.EntityCentricIndex;
-
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class GraphXInputFormatTest {
-
- private String username = "root", table = "rya_eci";
- private PasswordToken password = new PasswordToken("");
-
- private Instance instance;
- private AccumuloRyaDAO apiImpl;
-
- @Before
- public void init() throws Exception {
- instance = new MockInstance(GraphXInputFormatTest.class.getName() + ".mock_instance");
- Connector connector = instance.getConnector(username, password);
- connector.tableOperations().create(table);
-
- AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
- conf.setTablePrefix("rya_");
- conf.setDisplayQueryPlan(false);
- conf.setBoolean("sc.use_entity", true);
-
- apiImpl = new AccumuloRyaDAO();
- apiImpl.setConf(conf);
- apiImpl.setConnector(connector);
- apiImpl.init();
- }
-
- @After
- public void after() throws Exception {
- apiImpl.dropAndDestroy();
- }
-
- @Test
- public void testInputFormat() throws Exception {
- RyaStatement input = RyaStatement.builder()
- .setSubject(new RyaURI("http://www.google.com"))
- .setPredicate(new RyaURI("http://some_other_uri"))
- .setObject(new RyaURI("http://www.yahoo.com"))
- .setColumnVisibility(new byte[0])
- .setValue(new byte[0])
- .build();
-
- apiImpl.add(input);
-
- Job jobConf = Job.getInstance();
-
- GraphXInputFormat.setMockInstance(jobConf, instance.getInstanceName());
- GraphXInputFormat.setConnectorInfo(jobConf, username, password);
- GraphXInputFormat.setInputTableName(jobConf, table);
- GraphXInputFormat.setInputTableName(jobConf, table);
-
- GraphXInputFormat.setScanIsolation(jobConf, false);
- GraphXInputFormat.setLocalIterators(jobConf, false);
- GraphXInputFormat.setOfflineTableScan(jobConf, false);
-
- GraphXInputFormat inputFormat = new GraphXInputFormat();
-
- JobContext context = new JobContextImpl(jobConf.getConfiguration(), jobConf.getJobID());
-
- List<InputSplit> splits = inputFormat.getSplits(context);
-
- Assert.assertEquals(1, splits.size());
-
- TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new TaskID(), 1));
-
- RecordReader<Object, RyaTypeWritable> reader = inputFormat.createRecordReader(splits.get(0), taskAttemptContext);
-
- RyaStatementRecordReader ryaStatementRecordReader = (RyaStatementRecordReader)reader;
- ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext);
-
- List<RyaType> results = new ArrayList<RyaType>();
- System.out.println("before while");
- while(ryaStatementRecordReader.nextKeyValue()) {
- System.out.println("in while");
- RyaTypeWritable writable = ryaStatementRecordReader.getCurrentValue();
- RyaType value = writable.getRyaType();
- Object text = ryaStatementRecordReader.getCurrentKey();
- RyaType type = new RyaType();
- type.setData(value.getData());
- type.setDataType(value.getDataType());
- results.add(type);
-
- System.out.println(value.getData());
- System.out.println(value.getDataType());
- System.out.println(results);
- System.out.println(type);
- System.out.println(text);
- System.out.println(value);
- }
- System.out.println("after while");
-
- System.out.println(results.size());
- System.out.println(results);
-// Assert.assertTrue(results.size() == 2);
-// Assert.assertTrue(results.contains(input));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormatTest.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormatTest.java b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormatTest.java
new file mode 100644
index 0000000..6686c8f
--- /dev/null
+++ b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormatTest.java
@@ -0,0 +1,134 @@
+package org.apache.rya.accumulo.mr;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.accumulo.AccumuloRyaDAO;
+import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaURI;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.spark.graphx.Edge;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class GraphXEdgeInputFormatTest {
+
+ static String username = "root", table = "rya_spo";
+ static PasswordToken password = new PasswordToken("");
+
+ static Instance instance;
+ static AccumuloRyaDAO apiImpl;
+
+ @Before
+ public void init() throws Exception {
+ instance = new MockInstance(GraphXEdgeInputFormatTest.class.getName() + ".mock_instance");
+ Connector connector = instance.getConnector(username, password);
+ connector.tableOperations().create(table);
+
+ AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+ conf.setTablePrefix("rya_");
+ conf.setDisplayQueryPlan(false);
+
+ apiImpl = new AccumuloRyaDAO();
+ apiImpl.setConf(conf);
+ apiImpl.setConnector(connector);
+ apiImpl.init();
+ }
+
+ @After
+ public void after() throws Exception {
+ apiImpl.dropAndDestroy();
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testInputFormat() throws Exception {
+ RyaStatement input = RyaStatement.builder()
+ .setSubject(new RyaURI("http://www.google.com"))
+ .setPredicate(new RyaURI("http://some_other_uri"))
+ .setObject(new RyaURI("http://www.yahoo.com"))
+ .setColumnVisibility(new byte[0])
+ .setValue(new byte[0])
+ .build();
+
+ apiImpl.add(input);
+
+ Job jobConf = Job.getInstance();
+
+ GraphXEdgeInputFormat.setMockInstance(jobConf, instance.getInstanceName());
+ GraphXEdgeInputFormat.setConnectorInfo(jobConf, username, password);
+ GraphXEdgeInputFormat.setTableLayout(jobConf, TABLE_LAYOUT.SPO);
+ GraphXEdgeInputFormat.setInputTableName(jobConf, table);
+ GraphXEdgeInputFormat.setInputTableName(jobConf, table);
+
+ GraphXEdgeInputFormat.setScanIsolation(jobConf, false);
+ GraphXEdgeInputFormat.setLocalIterators(jobConf, false);
+ GraphXEdgeInputFormat.setOfflineTableScan(jobConf, false);
+
+ GraphXEdgeInputFormat inputFormat = new GraphXEdgeInputFormat();
+
+ JobContext context = new JobContextImpl(jobConf.getConfiguration(), jobConf.getJobID());
+
+ List<InputSplit> splits = inputFormat.getSplits(context);
+
+ Assert.assertEquals(1, splits.size());
+
+ TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new TaskID(), 1));
+
+ RecordReader reader = inputFormat.createRecordReader(splits.get(0), taskAttemptContext);
+
+ RecordReader ryaStatementRecordReader = (RecordReader) reader;
+ ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext);
+
+ List<Edge> results = new ArrayList<Edge>();
+ while(ryaStatementRecordReader.nextKeyValue()) {
+ Edge writable = (Edge) ryaStatementRecordReader.getCurrentValue();
+ long srcId = writable.srcId();
+ long destId = writable.dstId();
+ RyaTypeWritable rtw = null;
+ Object text = ryaStatementRecordReader.getCurrentKey();
+ Edge<RyaTypeWritable> edge = new Edge<RyaTypeWritable>(srcId, destId, rtw);
+ results.add(edge);
+
+ System.out.println(text);
+ }
+
+ System.out.println(results.size());
+ System.out.println(results);
+ Assert.assertTrue(results.size() == 2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXInputFormatTest.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXInputFormatTest.java b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXInputFormatTest.java
new file mode 100644
index 0000000..b2a663c
--- /dev/null
+++ b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXInputFormatTest.java
@@ -0,0 +1,142 @@
+package org.apache.rya.accumulo.mr;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.accumulo.AccumuloRyaDAO;
+import org.apache.rya.accumulo.mr.GraphXInputFormat.RyaStatementRecordReader;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.domain.RyaURI;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class GraphXInputFormatTest {
+
+ private String username = "root", table = "rya_eci";
+ private PasswordToken password = new PasswordToken("");
+
+ private Instance instance;
+ private AccumuloRyaDAO apiImpl;
+
+ @Before
+ public void init() throws Exception {
+ instance = new MockInstance(GraphXInputFormatTest.class.getName() + ".mock_instance");
+ Connector connector = instance.getConnector(username, password);
+ connector.tableOperations().create(table);
+
+ AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+ conf.setTablePrefix("rya_");
+ conf.setDisplayQueryPlan(false);
+ conf.setBoolean("sc.use_entity", true);
+
+ apiImpl = new AccumuloRyaDAO();
+ apiImpl.setConf(conf);
+ apiImpl.setConnector(connector);
+ apiImpl.init();
+ }
+
+ @After
+ public void after() throws Exception {
+ apiImpl.dropAndDestroy();
+ }
+
+ @Test
+ public void testInputFormat() throws Exception {
+ RyaStatement input = RyaStatement.builder()
+ .setSubject(new RyaURI("http://www.google.com"))
+ .setPredicate(new RyaURI("http://some_other_uri"))
+ .setObject(new RyaURI("http://www.yahoo.com"))
+ .setColumnVisibility(new byte[0])
+ .setValue(new byte[0])
+ .build();
+
+ apiImpl.add(input);
+
+ Job jobConf = Job.getInstance();
+
+ GraphXInputFormat.setMockInstance(jobConf, instance.getInstanceName());
+ GraphXInputFormat.setConnectorInfo(jobConf, username, password);
+ GraphXInputFormat.setInputTableName(jobConf, table);
+ GraphXInputFormat.setInputTableName(jobConf, table);
+
+ GraphXInputFormat.setScanIsolation(jobConf, false);
+ GraphXInputFormat.setLocalIterators(jobConf, false);
+ GraphXInputFormat.setOfflineTableScan(jobConf, false);
+
+ GraphXInputFormat inputFormat = new GraphXInputFormat();
+
+ JobContext context = new JobContextImpl(jobConf.getConfiguration(), jobConf.getJobID());
+
+ List<InputSplit> splits = inputFormat.getSplits(context);
+
+ Assert.assertEquals(1, splits.size());
+
+ TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new TaskID(), 1));
+
+ RecordReader<Object, RyaTypeWritable> reader = inputFormat.createRecordReader(splits.get(0), taskAttemptContext);
+
+ RyaStatementRecordReader ryaStatementRecordReader = (RyaStatementRecordReader)reader;
+ ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext);
+
+ List<RyaType> results = new ArrayList<RyaType>();
+ System.out.println("before while");
+ while(ryaStatementRecordReader.nextKeyValue()) {
+ System.out.println("in while");
+ RyaTypeWritable writable = ryaStatementRecordReader.getCurrentValue();
+ RyaType value = writable.getRyaType();
+ Object text = ryaStatementRecordReader.getCurrentKey();
+ RyaType type = new RyaType();
+ type.setData(value.getData());
+ type.setDataType(value.getDataType());
+ results.add(type);
+
+ System.out.println(value.getData());
+ System.out.println(value.getDataType());
+ System.out.println(results);
+ System.out.println(type);
+ System.out.println(text);
+ System.out.println(value);
+ }
+ System.out.println("after while");
+
+ System.out.println(results.size());
+ System.out.println(results);
+// Assert.assertTrue(results.size() == 2);
+// Assert.assertTrue(results.contains(input));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5ab29aa..43cf6f5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -64,6 +64,7 @@ under the License.
<module>osgi</module>
<module>pig</module>
<module>sail</module>
+ <module>spark</module>
<module>web</module>
</modules>
<properties>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index 02bdb37..04c8bb5 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -24,11 +24,11 @@ under the License.
<parent>
<groupId>org.apache.rya</groupId>
<artifactId>rya-project</artifactId>
- <version>3.2.10-SNAPSHOT</version>
+ <version>3.2.10-incubating-SNAPSHOT</version>
</parent>
<artifactId>rya.spark</artifactId>
- <name>Apache Rya MapReduce Tools</name>
+ <name>Apache Rya Spark Support</name>
<dependencies>
<dependency>
@@ -39,7 +39,7 @@ under the License.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
- <version>1.2.2</version>
+ <version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.rya</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/spark/src/main/java/mvm/rya/accumulo/spark/GraphXGraphGenerator.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/mvm/rya/accumulo/spark/GraphXGraphGenerator.java b/spark/src/main/java/mvm/rya/accumulo/spark/GraphXGraphGenerator.java
deleted file mode 100644
index f4b7860..0000000
--- a/spark/src/main/java/mvm/rya/accumulo/spark/GraphXGraphGenerator.java
+++ /dev/null
@@ -1,188 +0,0 @@
-package mvm.rya.accumulo.spark;
-
-import java.io.IOException;
-
-import mvm.rya.accumulo.AccumuloRdfConstants;
-import mvm.rya.accumulo.mr.GraphXEdgeInputFormat;
-import mvm.rya.accumulo.mr.GraphXInputFormat;
-import mvm.rya.accumulo.mr.MRUtils;
-import mvm.rya.accumulo.mr.RyaInputFormat;
-import mvm.rya.accumulo.mr.RyaTypeWritable;
-import mvm.rya.api.RdfCloudTripleStoreConfiguration;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.indexing.accumulo.ConfigUtils;
-import mvm.rya.indexing.accumulo.entity.EntityCentricIndex;
-
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.ClientConfiguration;
-import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.graphx.Edge;
-import org.apache.spark.graphx.Graph;
-import org.apache.spark.rdd.RDD;
-import org.apache.spark.storage.StorageLevel;
-
-import scala.Tuple2;
-import scala.reflect.ClassTag;
-
-import com.google.common.base.Preconditions;
-
-@SuppressWarnings({ "unchecked", "rawtypes" })
-public class GraphXGraphGenerator {
-
- public String zk;
- public String instance;
- public String userName;
- public String pwd;
- public boolean mock;
- public String tablePrefix;
- public Authorizations authorizations;
-
- public RDD<Tuple2<Object, RyaTypeWritable>> getVertexRDD(SparkContext sc, Configuration conf) throws IOException, AccumuloSecurityException{
- // Load configuration parameters
- zk = MRUtils.getACZK(conf);
- instance = MRUtils.getACInstance(conf);
- userName = MRUtils.getACUserName(conf);
- pwd = MRUtils.getACPwd(conf);
- mock = MRUtils.getACMock(conf, false);
- tablePrefix = MRUtils.getTablePrefix(conf);
- // Set authorizations if specified
- String authString = conf.get(MRUtils.AC_AUTH_PROP);
- if (authString != null && !authString.isEmpty()) {
- authorizations = new Authorizations(authString.split(","));
- conf.set(ConfigUtils.CLOUDBASE_AUTHS, authString); // for consistency
- }
- else {
- authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS;
- }
- // Set table prefix to the default if not set
- if (tablePrefix == null) {
- tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
- MRUtils.setTablePrefix(conf, tablePrefix);
- }
- // Check for required configuration parameters
- Preconditions.checkNotNull(instance, "Accumulo instance name [" + MRUtils.AC_INSTANCE_PROP + "] not set.");
- Preconditions.checkNotNull(userName, "Accumulo username [" + MRUtils.AC_USERNAME_PROP + "] not set.");
- Preconditions.checkNotNull(pwd, "Accumulo password [" + MRUtils.AC_PWD_PROP + "] not set.");
- Preconditions.checkNotNull(tablePrefix, "Table prefix [" + MRUtils.TABLE_PREFIX_PROPERTY + "] not set.");
- RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
- // If connecting to real accumulo, set additional parameters and require zookeepers
- if (!mock) conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zk); // for consistency
- // Ensure consistency between alternative configuration properties
- conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance);
- conf.set(ConfigUtils.CLOUDBASE_USER, userName);
- conf.set(ConfigUtils.CLOUDBASE_PASSWORD, pwd);
- conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, mock);
- conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, tablePrefix);
-
- Job job = Job.getInstance(conf, sc.appName());
-
- ClientConfiguration clientConfig = new ClientConfiguration().with(ClientProperty.INSTANCE_NAME, instance).with(ClientProperty.INSTANCE_ZK_HOST, zk);
-
- //maybe ask conf for correct suffix?
- GraphXInputFormat.setInputTableName(job, EntityCentricIndex.CONF_TABLE_SUFFIX);
- GraphXInputFormat.setConnectorInfo(job, userName, pwd);
- GraphXInputFormat.setZooKeeperInstance(job, clientConfig);
- GraphXInputFormat.setScanAuthorizations(job, authorizations);
-
- return sc.newAPIHadoopRDD(job.getConfiguration(), GraphXInputFormat.class, Object.class, RyaTypeWritable.class);
- }
-
- public RDD<Tuple2<Object, Edge>> getEdgeRDD(SparkContext sc, Configuration conf) throws IOException, AccumuloSecurityException{
- // Load configuration parameters
- zk = MRUtils.getACZK(conf);
- instance = MRUtils.getACInstance(conf);
- userName = MRUtils.getACUserName(conf);
- pwd = MRUtils.getACPwd(conf);
- mock = MRUtils.getACMock(conf, false);
- tablePrefix = MRUtils.getTablePrefix(conf);
- // Set authorizations if specified
- String authString = conf.get(MRUtils.AC_AUTH_PROP);
- if (authString != null && !authString.isEmpty()) {
- authorizations = new Authorizations(authString.split(","));
- conf.set(ConfigUtils.CLOUDBASE_AUTHS, authString); // for consistency
- }
- else {
- authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS;
- }
- // Set table prefix to the default if not set
- if (tablePrefix == null) {
- tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
- MRUtils.setTablePrefix(conf, tablePrefix);
- }
- // Check for required configuration parameters
- Preconditions.checkNotNull(instance, "Accumulo instance name [" + MRUtils.AC_INSTANCE_PROP + "] not set.");
- Preconditions.checkNotNull(userName, "Accumulo username [" + MRUtils.AC_USERNAME_PROP + "] not set.");
- Preconditions.checkNotNull(pwd, "Accumulo password [" + MRUtils.AC_PWD_PROP + "] not set.");
- Preconditions.checkNotNull(tablePrefix, "Table prefix [" + MRUtils.TABLE_PREFIX_PROPERTY + "] not set.");
- RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
- // If connecting to real accumulo, set additional parameters and require zookeepers
- if (!mock) conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zk); // for consistency
- // Ensure consistency between alternative configuration properties
- conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance);
- conf.set(ConfigUtils.CLOUDBASE_USER, userName);
- conf.set(ConfigUtils.CLOUDBASE_PASSWORD, pwd);
- conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, mock);
- conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, tablePrefix);
-
- Job job = Job.getInstance(conf, sc.appName());
-
- ClientConfiguration clientConfig = new ClientConfiguration().with(ClientProperty.INSTANCE_NAME, instance).with(ClientProperty.INSTANCE_ZK_HOST, zk);
-
- RyaInputFormat.setTableLayout(job, TABLE_LAYOUT.SPO);
- RyaInputFormat.setConnectorInfo(job, userName, pwd);
- RyaInputFormat.setZooKeeperInstance(job, clientConfig);
- RyaInputFormat.setScanAuthorizations(job, authorizations);
- return sc.newAPIHadoopRDD(job.getConfiguration(), GraphXEdgeInputFormat.class, Object.class, Edge.class);
- }
-
- public Graph<RyaTypeWritable, RyaTypeWritable> createGraph(SparkContext sc, Configuration conf) throws IOException, AccumuloSecurityException{
- StorageLevel storageLvl1 = StorageLevel.MEMORY_ONLY();
- StorageLevel storageLvl2 = StorageLevel.MEMORY_ONLY();
- ClassTag<RyaTypeWritable> RTWTag = null;
- RyaTypeWritable rtw = null;
- RDD<Tuple2<Object, RyaTypeWritable>> vertexRDD = getVertexRDD(sc, conf);
-
- RDD<Tuple2<Object, Edge>> edgeRDD = getEdgeRDD(sc, conf);
- JavaRDD<Tuple2<Object, Edge>> jrddTuple = edgeRDD.toJavaRDD();
- JavaRDD<Edge<RyaTypeWritable>> jrdd = jrddTuple.map(tuple -> tuple._2);
-
- RDD<Edge<RyaTypeWritable>> goodERDD = JavaRDD.toRDD(jrdd);
-
- return Graph.apply(vertexRDD, goodERDD, rtw, storageLvl1, storageLvl2, RTWTag, RTWTag);
- }
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/spark/src/main/java/org/apache/rya/accumulo/spark/GraphXGraphGenerator.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/rya/accumulo/spark/GraphXGraphGenerator.java b/spark/src/main/java/org/apache/rya/accumulo/spark/GraphXGraphGenerator.java
new file mode 100644
index 0000000..b1889b8
--- /dev/null
+++ b/spark/src/main/java/org/apache/rya/accumulo/spark/GraphXGraphGenerator.java
@@ -0,0 +1,183 @@
+package org.apache.rya.accumulo.spark;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.rya.accumulo.AccumuloRdfConstants;
+import org.apache.rya.accumulo.mr.GraphXEdgeInputFormat;
+import org.apache.rya.accumulo.mr.GraphXInputFormat;
+import org.apache.rya.accumulo.mr.MRUtils;
+import org.apache.rya.accumulo.mr.RyaInputFormat;
+import org.apache.rya.accumulo.mr.RyaTypeWritable;
+import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.api.RdfCloudTripleStoreConstants;
+import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import org.apache.rya.api.RdfCloudTripleStoreUtils;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.accumulo.entity.EntityCentricIndex;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.graphx.Edge;
+import org.apache.spark.graphx.Graph;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.storage.StorageLevel;
+
+import scala.Tuple2;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
+
+import com.google.common.base.Preconditions;
+
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class GraphXGraphGenerator {
+
+ public String zk;
+ public String instance;
+ public String userName;
+ public String pwd;
+ public boolean mock;
+ public String tablePrefix;
+ public Authorizations authorizations;
+
+ public RDD<Tuple2<Object, RyaTypeWritable>> getVertexRDD(SparkContext sc, Configuration conf) throws IOException, AccumuloSecurityException{
+ // Load configuration parameters
+ zk = MRUtils.getACZK(conf);
+ instance = MRUtils.getACInstance(conf);
+ userName = MRUtils.getACUserName(conf);
+ pwd = MRUtils.getACPwd(conf);
+ mock = MRUtils.getACMock(conf, false);
+ tablePrefix = MRUtils.getTablePrefix(conf);
+ // Set authorizations if specified
+ String authString = conf.get(MRUtils.AC_AUTH_PROP);
+ if (authString != null && !authString.isEmpty()) {
+ authorizations = new Authorizations(authString.split(","));
+ conf.set(ConfigUtils.CLOUDBASE_AUTHS, authString); // for consistency
+ }
+ else {
+ authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS;
+ }
+ // Set table prefix to the default if not set
+ if (tablePrefix == null) {
+ tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
+ MRUtils.setTablePrefix(conf, tablePrefix);
+ }
+ // Check for required configuration parameters
+ Preconditions.checkNotNull(instance, "Accumulo instance name [" + MRUtils.AC_INSTANCE_PROP + "] not set.");
+ Preconditions.checkNotNull(userName, "Accumulo username [" + MRUtils.AC_USERNAME_PROP + "] not set.");
+ Preconditions.checkNotNull(pwd, "Accumulo password [" + MRUtils.AC_PWD_PROP + "] not set.");
+ Preconditions.checkNotNull(tablePrefix, "Table prefix [" + MRUtils.TABLE_PREFIX_PROPERTY + "] not set.");
+ RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
+ // If connecting to real accumulo, set additional parameters and require zookeepers
+ if (!mock) conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zk); // for consistency
+ // Ensure consistency between alternative configuration properties
+ conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance);
+ conf.set(ConfigUtils.CLOUDBASE_USER, userName);
+ conf.set(ConfigUtils.CLOUDBASE_PASSWORD, pwd);
+ conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, mock);
+ conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, tablePrefix);
+
+ Job job = Job.getInstance(conf, sc.appName());
+
+ ClientConfiguration clientConfig = new ClientConfiguration().with(ClientProperty.INSTANCE_NAME, instance).with(ClientProperty.INSTANCE_ZK_HOST, zk);
+
+ GraphXInputFormat.setInputTableName(job, EntityCentricIndex.getTableName(conf));
+ GraphXInputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd));
+ GraphXInputFormat.setZooKeeperInstance(job, clientConfig);
+ GraphXInputFormat.setScanAuthorizations(job, authorizations);
+
+ return sc.newAPIHadoopRDD(job.getConfiguration(), GraphXInputFormat.class, Object.class, RyaTypeWritable.class);
+ }
+
+ public RDD<Tuple2<Object, Edge>> getEdgeRDD(SparkContext sc, Configuration conf) throws IOException, AccumuloSecurityException{
+ // Load configuration parameters
+ zk = MRUtils.getACZK(conf);
+ instance = MRUtils.getACInstance(conf);
+ userName = MRUtils.getACUserName(conf);
+ pwd = MRUtils.getACPwd(conf);
+ mock = MRUtils.getACMock(conf, false);
+ tablePrefix = MRUtils.getTablePrefix(conf);
+ // Set authorizations if specified
+ String authString = conf.get(MRUtils.AC_AUTH_PROP);
+ if (authString != null && !authString.isEmpty()) {
+ authorizations = new Authorizations(authString.split(","));
+ conf.set(ConfigUtils.CLOUDBASE_AUTHS, authString); // for consistency
+ }
+ else {
+ authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS;
+ }
+ // Set table prefix to the default if not set
+ if (tablePrefix == null) {
+ tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
+ MRUtils.setTablePrefix(conf, tablePrefix);
+ }
+ // Check for required configuration parameters
+ Preconditions.checkNotNull(instance, "Accumulo instance name [" + MRUtils.AC_INSTANCE_PROP + "] not set.");
+ Preconditions.checkNotNull(userName, "Accumulo username [" + MRUtils.AC_USERNAME_PROP + "] not set.");
+ Preconditions.checkNotNull(pwd, "Accumulo password [" + MRUtils.AC_PWD_PROP + "] not set.");
+ Preconditions.checkNotNull(tablePrefix, "Table prefix [" + MRUtils.TABLE_PREFIX_PROPERTY + "] not set.");
+ RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
+ // If connecting to real accumulo, set additional parameters and require zookeepers
+ if (!mock) conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zk); // for consistency
+ // Ensure consistency between alternative configuration properties
+ conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance);
+ conf.set(ConfigUtils.CLOUDBASE_USER, userName);
+ conf.set(ConfigUtils.CLOUDBASE_PASSWORD, pwd);
+ conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, mock);
+ conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, tablePrefix);
+
+ Job job = Job.getInstance(conf, sc.appName());
+
+ ClientConfiguration clientConfig = new ClientConfiguration().with(ClientProperty.INSTANCE_NAME, instance).with(ClientProperty.INSTANCE_ZK_HOST, zk);
+
+ RyaInputFormat.setTableLayout(job, TABLE_LAYOUT.SPO);
+ RyaInputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd));
+ RyaInputFormat.setZooKeeperInstance(job, clientConfig);
+ RyaInputFormat.setScanAuthorizations(job, authorizations);
+ String tableName = RdfCloudTripleStoreUtils.layoutPrefixToTable(TABLE_LAYOUT.SPO, tablePrefix);
+ InputFormatBase.setInputTableName(job, tableName);
+ return sc.newAPIHadoopRDD(job.getConfiguration(), GraphXEdgeInputFormat.class, Object.class, Edge.class);
+ }
+
+ public Graph<RyaTypeWritable, RyaTypeWritable> createGraph(SparkContext sc, Configuration conf) throws IOException, AccumuloSecurityException{
+ StorageLevel storageLvl1 = StorageLevel.MEMORY_ONLY();
+ StorageLevel storageLvl2 = StorageLevel.MEMORY_ONLY();
+ ClassTag<RyaTypeWritable> RTWTag = ClassTag$.MODULE$.apply(RyaTypeWritable.class);
+ RyaTypeWritable rtw = null;
+ RDD<Tuple2<Object, RyaTypeWritable>> vertexRDD = getVertexRDD(sc, conf);
+
+ RDD<Tuple2<Object, Edge>> edgeRDD = getEdgeRDD(sc, conf);
+ JavaRDD<Tuple2<Object, Edge>> jrddTuple = edgeRDD.toJavaRDD();
+ JavaRDD<Edge<RyaTypeWritable>> jrdd = jrddTuple.map(tuple -> tuple._2);
+
+ RDD<Edge<RyaTypeWritable>> goodERDD = JavaRDD.toRDD(jrdd);
+
+ return Graph.apply(vertexRDD, goodERDD, rtw, storageLvl1, storageLvl2, RTWTag, RTWTag);
+ }
+}