You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2016/12/15 19:37:44 UTC
incubator-rya git commit: RYA-224 Fluo Visibility Join Bug Fix;
Closes #123
Repository: incubator-rya
Updated Branches:
refs/heads/master cf702ffa8 -> 22bdc7a2e
RYA-224 Fluo Visibility Join Bug Fix; Closes #123
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/22bdc7a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/22bdc7a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/22bdc7a2
Branch: refs/heads/master
Commit: 22bdc7a2e16f94c9834c51be815fe3443f133650
Parents: cf702ff
Author: Caleb Meier <me...@gmail.com>
Authored: Tue Nov 22 11:39:09 2016 -0500
Committer: pujav65 <pu...@gmail.com>
Committed: Thu Dec 15 14:37:15 2016 -0500
----------------------------------------------------------------------
.../pcj/fluo/app/JoinResultUpdater.java | 3 +-
.../pcj/fluo/integration/StreamingTestIT.java | 134 +++++++++++++++++++
2 files changed, 136 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/22bdc7a2/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
index 9084cd6..39dcc16 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
@@ -151,7 +151,8 @@ public class JoinResultUpdater {
// Get the Binding strings
final String childBindingSetString = valueConverter.convert(childBindingSet, childVarOrder);
- final String[] childBindingStrings = FluoStringConverter.toBindingStrings(childBindingSetString);
+ String[] childBindingArray = childBindingSetString.split("\u0001");
+ final String[] childBindingStrings = FluoStringConverter.toBindingStrings(childBindingArray[0]);
// Create the prefix that will be used to scan for binding sets of the sibling node.
// This prefix includes the sibling Node ID and the common variable values from
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/22bdc7a2/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
new file mode 100644
index 0000000..2573925
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.integration;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.openrdf.model.Resource;
+import org.openrdf.model.Statement;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.impl.StatementImpl;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.repository.sail.SailRepository;
+import org.openrdf.repository.sail.SailRepositoryConnection;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailException;
+
+
+public class StreamingTestIT extends ITBase {
+
+ private static final Logger log = Logger.getLogger(ITBase.class);
+ private static String query = "select ?name ?uuid where { ?uuid <http://pred1> ?name ; <http://pred2> \"literal\".}";
+ private static String uuidPrefix = "http://uuid_";
+ private static String name = "number_";
+ private static String pred1 = "http://pred1";
+ private static String pred2 = "http://pred2";
+
+ private PcjTables pcjTables = new PcjTables();
+ private String pcjTableName;
+
+ private Sail sail;
+ private SailRepository repo;
+ private SailRepositoryConnection conn;
+
+
+ @Before
+ public void init() throws Exception {
+ AccumuloRdfConfiguration conf = makeConfig(instanceName, zookeepers);
+ conf.set(ConfigUtils.CLOUDBASE_AUTHS, "U");
+ sail = RyaSailFactory.getInstance(conf);
+ repo = new SailRepository(sail);
+ conn = repo.getConnection();
+ }
+
+ @After
+ public void close() throws RepositoryException, SailException {
+ conn.close();
+ repo.shutDown();
+ sail.shutDown();
+ }
+
+
+ @Test
+ public void testRandomStreamingIngest() throws Exception {
+
+ pcjTableName = createPcj(query);
+ log.info("Adding Join Pairs...");
+ addRandomQueryStatementPairs(100);
+ Assert.assertEquals(100, countPcjs());
+
+ }
+
+ private String createPcj(String pcj) throws Exception {
+ accumuloConn.securityOperations().changeUserAuthorizations("root", new Authorizations("U"));
+ // Create the PCJ table.
+ final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+ final String pcjId = pcjStorage.createPcj(pcj);
+ new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo);
+ String tableName = RYA_INSTANCE_NAME + "INDEX_" + pcjId;
+
+ return tableName;
+ }
+
+ private void addRandomQueryStatementPairs(int numPairs) throws Exception {
+
+ Set<Statement> statementPairs = new HashSet<>();
+ for (int i = 0; i < numPairs; i++) {
+ String uri = uuidPrefix + UUID.randomUUID().toString();
+ Statement statement1 = new StatementImpl(new URIImpl(uri), new URIImpl(pred1),
+ new LiteralImpl(name + (i + 1)));
+ Statement statement2 = new StatementImpl(new URIImpl(uri), new URIImpl(pred2), new LiteralImpl("literal"));
+ statementPairs.add(statement1);
+ statementPairs.add(statement2);
+ }
+ conn.add(statementPairs, new Resource[0]);
+ fluo.waitForObservers();
+ }
+
+ private int countPcjs() throws Exception {
+ Iterable<BindingSet> bindingsets = pcjTables.listResults(accumuloConn, pcjTableName, new Authorizations("U"));
+ int count = 0;
+ for (BindingSet bs : bindingsets) {
+// System.out.println(bs);
+ count++;
+ }
+// IncUpdateDAO.printAll(fluoClient);
+ return count;
+ }
+
+
+}