You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/12/05 19:49:23 UTC
[2/4] incubator-rya git commit: RYA-406. Closes #251.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java
new file mode 100644
index 0000000..e095309
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+import static org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter.TYPE_DELIM;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Bytes.BytesBuilder;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.model.Value;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.hash.Hashing;
+
+/**
+ * This class adds or removes a hash to or from the rowId for sharding purposes. When creating a sharded row key, it
+ * takes the form: node_prefix:shardId:nodeId//Binding_values. For example, the row key generated from nodeId = SP_123,
+ * varOrder = a;b, bs = [a = uri:Bob, b = uri:Doug] would be SP:HASH(uri:Bob):123//uri:Bob;uri:Doug, where HASH(uri:Bob)
+ * indicates the shard id hash generated from the Binding value "uri:Bob".
+ *
+ */
+public class BindingHashShardingFunction {
+
+ private static final BindingSetStringConverter BS_CONVERTER = new BindingSetStringConverter();
+ private static final int HASH_LEN = 4;
+
+ /**
+ * Generates a sharded rowId. The rowId is of the form: node_prefix:shardId:nodeId//Binding_values. For
+ * example, the row key generated from nodeId = SP_123, varOrder = a;b, bs = [a = uri:Bob, b = uri:Doug] would be
+ * SP:HASH(uri:Bob):123//uri:Bob;uri:Doug, where HASH(uri:Bob) indicates the shard id hash generated from the
+ * Binding value "uri:Bob".
+ *
+ * @param nodeId - Node Id with type and UUID
+ * @param varOrder - VarOrder used to order BindingSet values
+ * @param bs - BindingSet with partially formed query values
+ * @return - serialized Bytes rowId for storing BindingSet results in Fluo
+ */
+ public static Bytes addShard(String nodeId, VariableOrder varOrder, VisibilityBindingSet bs) {
+ checkNotNull(nodeId);
+ checkNotNull(varOrder);
+ checkNotNull(bs);
+ String[] rowPrefixAndId = nodeId.split("_");
+ Preconditions.checkArgument(rowPrefixAndId.length == 2);
+ String prefix = rowPrefixAndId[0];
+ String id = rowPrefixAndId[1];
+
+ String firstBindingString = "";
+ Bytes rowSuffix = Bytes.of(id);
+ if (varOrder.getVariableOrders().size() > 0) {
+ VariableOrder first = new VariableOrder(varOrder.getVariableOrders().get(0));
+ firstBindingString = BS_CONVERTER.convert(bs, first);
+ rowSuffix = RowKeyUtil.makeRowKey(id, varOrder, bs);
+ }
+
+ BytesBuilder builder = Bytes.builder();
+ builder.append(Bytes.of(prefix + ":"));
+ builder.append(genHash(Bytes.of(id + NODEID_BS_DELIM + firstBindingString)));
+ builder.append(":");
+ builder.append(rowSuffix);
+ return builder.toBytes();
+ }
+
+ /**
+ * Generates a sharded rowId. The rowId is of the form: node_prefix:shardId:nodeId//Binding_values. For
+ * example, the row key generated from nodeId = SP_123, varOrder = a;b, bs = [a = uri:Bob, b = uri:Doug] would be
+ * SP:HASH(uri:Bob):123//uri:Bob;uri:Doug, where HASH(uri:Bob) indicates the shard id hash generated from the
+ * Binding value "uri:Bob".
+ *
+ * @param nodeId - Node Id with type and UUID
+ * @param firstBsVal - String representation of the first BsValue
+ * @return - serialized Bytes prefix for scanning rows
+ */
+ public static Bytes getShardedScanPrefix(String nodeId, Value firstBsVal) {
+ checkNotNull(nodeId);
+ checkNotNull(firstBsVal);
+
+ final RyaType ryaValue = RdfToRyaConversions.convertValue(firstBsVal);
+ final String bindingString = ryaValue.getData() + TYPE_DELIM + ryaValue.getDataType();
+
+ return getShardedScanPrefix(nodeId, bindingString);
+ }
+
+ /**
+ * Generates a sharded rowId from the indicated nodeId and bindingString. For example, the row key generated from
+ * nodeId = SP_123, varOrder = a;b, bs = [a = uri:Bob, b = uri:Doug] would be
+ * SP:HASH(uri:Bob):123//uri:Bob;uri:Doug, where HASH(uri:Bob) indicates the shard id hash generated from the
+ * Binding value "uri:Bob".
+ *
+ * @param nodeId - NodeId with tyep and UUID
+ * @param bindingString - String representation of BindingSet values, as formed by {@link BindingSetStringConverter}
+ * @return - serialized, sharded Bytes prefix
+ */
+ public static Bytes getShardedScanPrefix(String nodeId, String bindingString) {
+ checkNotNull(nodeId);
+ checkNotNull(bindingString);
+ String[] rowPrefixAndId = nodeId.split("_");
+ Preconditions.checkArgument(rowPrefixAndId.length == 2);
+ String prefix = rowPrefixAndId[0];
+ String id = rowPrefixAndId[1];
+
+ BytesBuilder builder = Bytes.builder();
+ builder.append(prefix + ":");
+ builder.append(genHash(Bytes.of(id + NODEID_BS_DELIM + bindingString)));
+ builder.append(":");
+ builder.append(id);
+ builder.append(NODEID_BS_DELIM);
+ builder.append(bindingString);
+ return builder.toBytes();
+ }
+
+ private static boolean hasHash(Bytes prefixBytes, Bytes row) {
+ for (int i = prefixBytes.length() + 1; i < prefixBytes.length() + HASH_LEN; i++) {
+ byte b = row.byteAt(i);
+ boolean isAlphaNum = (b >= 'a' && b <= 'z') || (b >= '0' && b <= '9');
+ if (!isAlphaNum) {
+ return false;
+ }
+ }
+
+ if (row.byteAt(prefixBytes.length()) != ':' || row.byteAt(prefixBytes.length() + HASH_LEN + 1) != ':') {
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * @return Returns input with prefix and hash stripped from beginning.
+ */
+ public static Bytes removeHash(Bytes prefixBytes, Bytes row) {
+ checkNotNull(prefixBytes);
+ checkNotNull(row);
+ checkArgument(row.length() >= prefixBytes.length() + 6, "Row is shorter than expected " + row);
+ checkArgument(row.subSequence(0, prefixBytes.length()).equals(prefixBytes),
+ "Row does not have expected prefix " + row);
+ checkArgument(hasHash(prefixBytes, row), "Row does not have expected hash " + row);
+
+ BytesBuilder builder = Bytes.builder();
+ builder.append(prefixBytes);
+ builder.append("_");
+ builder.append(row.subSequence(prefixBytes.length() + 6, row.length()));
+ return builder.toBytes();
+ }
+
+ private static String genHash(Bytes row) {
+ int hash = Hashing.murmur3_32().hashBytes(row.toArray()).asInt();
+ hash = hash & 0x7fffffff;
+ // base 36 gives a lot more bins in 4 bytes than hex, but it is still human readable which is
+ // nice for debugging.
+ String hashString = Strings.padStart(Integer.toString(hash, Character.MAX_RADIX), HASH_LEN, '0');
+ hashString = hashString.substring(hashString.length() - HASH_LEN);
+ return hashString;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java
new file mode 100644
index 0000000..d451d28
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.TRIPLE_PREFIX;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Bytes.BytesBuilder;
+import org.apache.rya.api.domain.RyaStatement;
+
+/**
+ * This class is a utility class for adding and removing the Triple prefix to
+ * Statements ingested into the Rya Fluo application. The Triple prefix is added
+ * to supported range based compactions for removing transient data from the Fluo
+ * application. This prefix supports the Transient data recipe described on the
+ * the Fluo website, and reduces the computational load on the system by cleaning up
+ * old deleted Triples and notifications using a targeted range compaction.
+ *
+ */
+public class TriplePrefixUtils {
+
+ private static final Bytes TRIPLE_PREFIX_BYTES = Bytes.of(TRIPLE_PREFIX + NODEID_BS_DELIM);
+
+ /**
+ * Prepends the triple prefix to the provided bytes and returns the new value as a {@link Bytes}.
+ * @param tripleBytes - serialized {@link RyaStatement}
+ * @return - serialized RyaStatement with prepended triple prefix, converted to Bytes
+ */
+ public static Bytes addTriplePrefixAndConvertToBytes(byte[] tripleBytes) {
+ checkNotNull(tripleBytes);
+ BytesBuilder builder = Bytes.builder();
+ return builder.append(TRIPLE_PREFIX_BYTES).append(tripleBytes).toBytes();
+ }
+
+ /**
+ * Removes the triple prefix and returns the new value as a byte array.
+ * @param prefixedTriple - serialized RyaStatement with prepended triple prefix, converted to Bytes
+ * @return - serialized {@link RyaStatement} in byte array form
+ */
+ public static byte[] removeTriplePrefixAndConvertToByteArray(Bytes prefixedTriple) {
+ checkNotNull(prefixedTriple);
+ return prefixedTriple.subSequence(TRIPLE_PREFIX_BYTES.length(), prefixedTriple.length()).toArray();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java
index 3df3708..f0faf1f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.rya.indexing.pcj.fluo.app.query;
import static org.junit.Assert.assertEquals;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheTest.java
new file mode 100644
index 0000000..657b548
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID;
+import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS;
+import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH;
+import static org.mockito.Mockito.when;
+
+import java.util.Set;
+
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.collect.Sets;
+
+public class StatementPatternIdCacheTest {
+
+ @Test
+ public void testCache() {
+ Transaction mockTx = Mockito.mock(Transaction.class);
+ String nodeId = NodeType.generateNewFluoIdForType(NodeType.STATEMENT_PATTERN);
+ when(mockTx.get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS)).thenReturn(Bytes.of(nodeId));
+ when(mockTx.get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH)).thenReturn(Bytes.of("123"));
+
+ StatementPatternIdCache cache = new StatementPatternIdCache();
+ Set<String> ids1 = cache.getStatementPatternIds(mockTx);
+ Set<String> ids2 = cache.getStatementPatternIds(mockTx);
+
+ Assert.assertEquals(ids1, ids2);
+ Assert.assertEquals(Sets.newHashSet(nodeId), ids1);
+
+ Mockito.verify(mockTx, Mockito.times(1)).get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunctionTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunctionTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunctionTest.java
new file mode 100644
index 0000000..729cdeb
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunctionTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.junit.Assert;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+public class BindingHashShardingFunctionTest {
+
+ private static final ValueFactory vf = new ValueFactoryImpl();
+
+ @Test
+ public void shardAddAndRemoveTest() {
+ String nodeId = NodeType.generateNewFluoIdForType(NodeType.STATEMENT_PATTERN);
+ QueryBindingSet bs = new QueryBindingSet();
+ bs.addBinding("entity", vf.createURI("urn:entity"));
+ bs.addBinding("location", vf.createLiteral("location_1"));
+ VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
+ VariableOrder varOrder = new VariableOrder("entity","location");
+ Bytes row = RowKeyUtil.makeRowKey(nodeId, varOrder, vBs);
+ Bytes shardedRow = BindingHashShardingFunction.addShard(nodeId, varOrder, vBs);
+ Bytes shardlessRow = BindingHashShardingFunction.removeHash(Bytes.of(SP_PREFIX), shardedRow);
+ Assert.assertEquals(row, shardlessRow);
+ }
+
+ @Test
+ public void bindingSetRowTest() {
+ String nodeId = NodeType.generateNewFluoIdForType(NodeType.STATEMENT_PATTERN);
+ QueryBindingSet bs = new QueryBindingSet();
+ bs.addBinding("entity", vf.createURI("urn:entity"));
+ bs.addBinding("location", vf.createLiteral("location_1"));
+ VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
+ VariableOrder varOrder = new VariableOrder("entity","location");
+ Bytes row = RowKeyUtil.makeRowKey(nodeId, varOrder, vBs);
+ Bytes shardedRow = BindingHashShardingFunction.addShard(nodeId, varOrder, vBs);
+ BindingSetRow expected = BindingSetRow.make(row);
+ BindingSetRow actual = BindingSetRow.makeFromShardedRow(Bytes.of(SP_PREFIX), shardedRow);
+ Assert.assertEquals(expected, actual);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtilsTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtilsTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtilsTest.java
new file mode 100644
index 0000000..ac41dc8
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtilsTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import java.util.Arrays;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.api.resolver.triple.TripleRowResolverException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TriplePrefixUtilsTest {
+
+ @Test
+ public void testAddRemovePrefix() throws TripleRowResolverException {
+ byte[] expected = Bytes.of("triple").toArray();
+ Bytes fluoBytes = TriplePrefixUtils.addTriplePrefixAndConvertToBytes(expected);
+ byte[] returned = TriplePrefixUtils.removeTriplePrefixAndConvertToByteArray(fluoBytes);
+ Assert.assertEquals(true, Arrays.equals(expected, returned));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
index c038dbe..e594474 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
@@ -1,14 +1,21 @@
-<?xml version="1.0" encoding="utf-8"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
- license agreements. See the NOTICE file distributed with this work for additional
- information regarding copyright ownership. The ASF licenses this file to
- you under the Apache License, Version 2.0 (the "License"); you may not use
- this file except in compliance with the License. You may obtain a copy of
- the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
- by applicable law or agreed to in writing, software distributed under the
- License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
- OF ANY KIND, either express or implied. See the License for the specific
- language governing permissions and limitations under the License. -->
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
index 66aa04b..7e882e9 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
@@ -41,7 +41,7 @@ import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation;
@@ -54,13 +54,16 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
+import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction;
import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
import org.junit.Test;
-import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.URI;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
import org.openrdf.query.algebra.evaluation.QueryBindingSet;
import com.google.common.base.Optional;
@@ -70,6 +73,7 @@ public class BatchIT extends RyaExportITBase {
private static final Logger log = Logger.getLogger(BatchIT.class);
private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+ private static final ValueFactory vf = new ValueFactoryImpl();
@Test
public void simpleScanDelete() throws Exception {
@@ -89,16 +93,16 @@ public class BatchIT extends RyaExportITBase {
final String pcjId = pcjStorage.createPcj(sparql);
// Tell the Fluo app to maintain the PCJ.
- String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
- getRyaInstanceName()).getQueryId();
+ String queryId = new CreateFluoPcj()
+ .withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()).getQueryId();
List<String> ids = getNodeIdStrings(fluoClient, queryId);
List<String> prefixes = Arrays.asList("urn:subject_1", "urn:subject_1", "urn:object", "urn:subject_1", "urn:subject_1");
// Stream the data into Fluo.
InsertTriples inserter = new InsertTriples();
- inserter.insert(fluoClient, statements1, Optional.<String> absent());
- inserter.insert(fluoClient, statements2, Optional.<String> absent());
+ inserter.insert(fluoClient, statements1, Optional.absent());
+ inserter.insert(fluoClient, statements2, Optional.absent());
// Verify the end results of the query match the expected results.
getMiniFluo().waitForObservers();
@@ -129,22 +133,26 @@ public class BatchIT extends RyaExportITBase {
final String pcjId = pcjStorage.createPcj(sparql);
// Tell the Fluo app to maintain the PCJ.
- String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
- getRyaInstanceName()).getQueryId();
+ String queryId = new CreateFluoPcj()
+ .withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()).getQueryId();
List<String> ids = getNodeIdStrings(fluoClient, queryId);
String joinId = ids.get(2);
String rightSp = ids.get(4);
QueryBindingSet bs = new QueryBindingSet();
- bs.addBinding("subject", new URIImpl("urn:subject_1"));
- bs.addBinding("object1", new URIImpl("urn:object_0"));
+ bs.addBinding("subject", vf.createURI("urn:subject_1"));
+ bs.addBinding("object1", vf.createURI("urn:object_0"));
VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
- Span span = Span.prefix(Bytes.of(rightSp + IncrementalUpdateConstants.NODEID_BS_DELIM + "urn:subject_1"));
+
+ //create sharded span for deletion
+ URI uri = vf.createURI("urn:subject_1");
+ Bytes prefixBytes = BindingHashShardingFunction.getShardedScanPrefix(rightSp, uri);
+ Span span = Span.prefix(prefixBytes);
// Stream the data into Fluo.
InsertTriples inserter = new InsertTriples();
- inserter.insert(fluoClient, statements1, Optional.<String> absent());
- inserter.insert(fluoClient, statements2, Optional.<String> absent());
+ inserter.insert(fluoClient, statements1, Optional.absent());
+ inserter.insert(fluoClient, statements2, Optional.absent());
getMiniFluo().waitForObservers();
verifyCounts(fluoClient, ids, Arrays.asList(25, 25, 25, 5, 5));
@@ -175,21 +183,24 @@ public class BatchIT extends RyaExportITBase {
final String pcjId = pcjStorage.createPcj(sparql);
// Tell the Fluo app to maintain the PCJ.
- String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
- getRyaInstanceName()).getQueryId();
+ String queryId = new CreateFluoPcj()
+ .withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()).getQueryId();
List<String> ids = getNodeIdStrings(fluoClient, queryId);
String joinId = ids.get(2);
String rightSp = ids.get(4);
QueryBindingSet bs = new QueryBindingSet();
- bs.addBinding("subject", new URIImpl("urn:subject_1"));
- bs.addBinding("object1", new URIImpl("urn:object_0"));
+ bs.addBinding("subject", vf.createURI("urn:subject_1"));
+ bs.addBinding("object1", vf.createURI("urn:object_0"));
VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
- Span span = Span.prefix(Bytes.of(rightSp + IncrementalUpdateConstants.NODEID_BS_DELIM + "urn:subject_1"));
+
+ URI uri = vf.createURI("urn:subject_1");
+ Bytes prefixBytes = BindingHashShardingFunction.getShardedScanPrefix(rightSp, uri);
+ Span span = Span.prefix(prefixBytes);
// Stream the data into Fluo.
InsertTriples inserter = new InsertTriples();
- inserter.insert(fluoClient, statements2, Optional.<String> absent());
+ inserter.insert(fluoClient, statements2, Optional.absent());
getMiniFluo().waitForObservers();
verifyCounts(fluoClient, ids, Arrays.asList(0, 0, 0, 0, 5));
@@ -214,7 +225,7 @@ public class BatchIT extends RyaExportITBase {
RyaURI subj = new RyaURI("urn:subject_1");
RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null);
RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
-
+
Set<RyaStatement> statements1 = getRyaStatements(statement1, 15);
Set<RyaStatement> statements2 = getRyaStatements(statement2, 15);
@@ -224,22 +235,21 @@ public class BatchIT extends RyaExportITBase {
// Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and
// batch size of joins to 5.
- String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
- getRyaInstanceName()).getQueryId();
+ String queryId = new CreateFluoPcj(5, 5)
+ .withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()).getQueryId();
List<String> ids = getNodeIdStrings(fluoClient, queryId);
// Stream the data into Fluo.
InsertTriples inserter = new InsertTriples();
- inserter.insert(fluoClient, statements1, Optional.<String> absent());
- inserter.insert(fluoClient, statements2, Optional.<String> absent());
+ inserter.insert(fluoClient, statements1, Optional.absent());
+ inserter.insert(fluoClient, statements2, Optional.absent());
getMiniFluo().waitForObservers();
verifyCounts(fluoClient, ids, Arrays.asList(225, 225, 225, 15, 15));
}
}
-
-
+
@Test
public void leftJoinBatchIntegrationTest() throws Exception {
final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; "
@@ -249,10 +259,10 @@ public class BatchIT extends RyaExportITBase {
RyaURI subj = new RyaURI("urn:subject_1");
RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null);
RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
-
+
subj = new RyaURI("urn:subject_2");
RyaStatement statement3 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null);
-
+
Set<RyaStatement> statements1 = getRyaStatements(statement1, 10);
Set<RyaStatement> statements2 = getRyaStatements(statement2, 10);
Set<RyaStatement> statements3 = getRyaStatements(statement3, 10);
@@ -263,37 +273,35 @@ public class BatchIT extends RyaExportITBase {
// Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and
// batch size of joins to 5.
- String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
- getRyaInstanceName()).getQueryId();
+ String queryId = new CreateFluoPcj(5, 5)
+ .withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()).getQueryId();
List<String> ids = getNodeIdStrings(fluoClient, queryId);
// Stream the data into Fluo.
InsertTriples inserter = new InsertTriples();
- inserter.insert(fluoClient, statements1, Optional.<String> absent());
- inserter.insert(fluoClient, statements2, Optional.<String> absent());
- inserter.insert(fluoClient, statements3, Optional.<String> absent());
+ inserter.insert(fluoClient, statements1, Optional.absent());
+ inserter.insert(fluoClient, statements2, Optional.absent());
+ inserter.insert(fluoClient, statements3, Optional.absent());
getMiniFluo().waitForObservers();
verifyCounts(fluoClient, ids, Arrays.asList(110, 110, 110, 20, 10));
}
}
-
-
+
@Test
public void multiJoinBatchIntegrationTest() throws Exception {
final String sparql = "SELECT ?subject1 ?subject2 ?object1 ?object2 WHERE { ?subject1 <urn:predicate_1> ?object1; "
- + " <urn:predicate_2> ?object2 ."
- + " ?subject2 <urn:predicate_3> ?object2 } ";
+ + " <urn:predicate_2> ?object2 ." + " ?subject2 <urn:predicate_3> ?object2 } ";
try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) {
RyaURI subj1 = new RyaURI("urn:subject_1");
RyaStatement statement1 = new RyaStatement(subj1, new RyaURI("urn:predicate_1"), null);
RyaStatement statement2 = new RyaStatement(subj1, new RyaURI("urn:predicate_2"), null);
-
+
Set<RyaStatement> statements1 = getRyaStatements(statement1, 10);
Set<RyaStatement> statements2 = getRyaStatements(statement2, 10);
-
+
RyaURI subj2 = new RyaURI("urn:subject_2");
RyaStatement statement3 = new RyaStatement(subj2, new RyaURI("urn:predicate_3"), null);
Set<RyaStatement> statements3 = getRyaStatements(statement3, 10);
@@ -304,23 +312,22 @@ public class BatchIT extends RyaExportITBase {
// Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and
// batch size of joins to 5.
- String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
- getRyaInstanceName()).getQueryId();
+ String queryId = new CreateFluoPcj(5, 5)
+ .withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()).getQueryId();
List<String> ids = getNodeIdStrings(fluoClient, queryId);
// Stream the data into Fluo.
InsertTriples inserter = new InsertTriples();
- inserter.insert(fluoClient, statements1, Optional.<String> absent());
- inserter.insert(fluoClient, statements2, Optional.<String> absent());
- inserter.insert(fluoClient, statements3, Optional.<String> absent());
+ inserter.insert(fluoClient, statements1, Optional.absent());
+ inserter.insert(fluoClient, statements2, Optional.absent());
+ inserter.insert(fluoClient, statements3, Optional.absent());
getMiniFluo().waitForObservers();
verifyCounts(fluoClient, ids, Arrays.asList(100, 100, 100, 100, 10, 10, 10));
}
}
-
private Set<RyaStatement> getRyaStatements(RyaStatement statement, int numTriples) {
Set<RyaStatement> statements = new HashSet<>();
@@ -361,13 +368,19 @@ public class BatchIT extends RyaExportITBase {
for (int i = 0; i < ids.size(); i++) {
String id = ids.get(i);
String bsPrefix = prefixes.get(i);
+ URI uri = vf.createURI(bsPrefix);
+ Bytes prefixBytes = BindingHashShardingFunction.getShardedScanPrefix(id, uri);
NodeType type = NodeType.fromNodeId(id).get();
Column bsCol = type.getResultColumn();
- String row = id + IncrementalUpdateConstants.NODEID_BS_DELIM + bsPrefix;
- Span span = Span.prefix(Bytes.of(row));
- BatchInformation batch = SpanBatchDeleteInformation.builder().setBatchSize(batchSize).setColumn(bsCol).setSpan(span)
- .build();
- BatchInformationDAO.addBatch(tx, id, batch);
+ SpanBatchDeleteInformation.Builder builder = SpanBatchDeleteInformation.builder().setBatchSize(batchSize)
+ .setColumn(bsCol);
+ if (type == NodeType.JOIN) {
+ builder.setSpan(Span.prefix(type.getNodeTypePrefix()));
+ builder.setNodeId(java.util.Optional.of(id));
+ } else {
+ builder.setSpan(Span.prefix(prefixBytes));
+ }
+ BatchInformationDAO.addBatch(tx, id, builder.build());
}
tx.commit();
}
@@ -383,14 +396,19 @@ public class BatchIT extends RyaExportITBase {
private int countResults(FluoClient fluoClient, String nodeId, Column bsColumn) {
try (Transaction tx = fluoClient.newTransaction()) {
int count = 0;
- RowScanner scanner = tx.scanner().over(Span.prefix(nodeId)).fetch(bsColumn).byRow().build();
+ Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ Bytes prefixBytes = Bytes.of(type.get().getNodeTypePrefix());
+ RowScanner scanner = tx.scanner().over(Span.prefix(prefixBytes)).fetch(bsColumn).byRow().build();
Iterator<ColumnScanner> colScanners = scanner.iterator();
while (colScanners.hasNext()) {
ColumnScanner colScanner = colScanners.next();
- Iterator<ColumnValue> vals = colScanner.iterator();
- while (vals.hasNext()) {
- vals.next();
- count++;
+ BindingSetRow bsRow = BindingSetRow.makeFromShardedRow(prefixBytes, colScanner.getRow());
+ if (bsRow.getNodeId().equals(nodeId)) {
+ Iterator<ColumnValue> vals = colScanner.iterator();
+ while (vals.hasNext()) {
+ vals.next();
+ count++;
+ }
}
}
tx.commit();
@@ -405,7 +423,6 @@ public class BatchIT extends RyaExportITBase {
int expected = expectedCounts.get(i);
NodeType type = NodeType.fromNodeId(id).get();
int count = countResults(fluoClient, id, type.getResultColumn());
- log.trace("NodeId: " + id + " Count: " + count + " Expected: " + expected);
switch (type) {
case STATEMENT_PATTERN:
assertEquals(expected, count);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
index 27b8222..ef5ab34 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
@@ -76,17 +76,18 @@ public class CreateDeleteIT extends RyaExportITBase {
// Create the PCJ in Fluo and load the statements into Rya.
final String pcjId = loadData(sparql, statements);
- try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ try(FluoClient fluoClient = FluoFactory.newClient(getFluoConfiguration())) {
// Ensure the data was loaded.
final List<Bytes> rows = getFluoTableEntries(fluoClient);
- assertEquals(18, rows.size());
+ assertEquals(19, rows.size());
// Delete the PCJ from the Fluo application.
new DeleteFluoPcj(1).deletePcj(fluoClient, pcjId);
+ getMiniFluo().waitForObservers();
// Ensure all data related to the query has been removed.
final List<Bytes> empty_rows = getFluoTableEntries(fluoClient);
- assertEquals(0, empty_rows.size());
+ assertEquals(1, empty_rows.size());
}
}
@@ -108,20 +109,21 @@ public class CreateDeleteIT extends RyaExportITBase {
// Create the PCJ in Fluo and load the statements into Rya.
final String pcjId = loadData(sparql, statements);
- try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ try(FluoClient fluoClient = FluoFactory.newClient(getFluoConfiguration())) {
// Ensure the data was loaded.
final List<Bytes> rows = getFluoTableEntries(fluoClient);
- assertEquals(10, rows.size());
+ assertEquals(11, rows.size());
// Delete the PCJ from the Fluo application.
new DeleteFluoPcj(1).deletePcj(fluoClient, pcjId);
+ getMiniFluo().waitForObservers();
// Ensure all data related to the query has been removed.
final List<Bytes> empty_rows = getFluoTableEntries(fluoClient);
- assertEquals(0, empty_rows.size());
+ assertEquals(1, empty_rows.size());
}
}
-
+
private String loadData(final String sparql, final Collection<Statement> statements) throws Exception {
requireNonNull(sparql);
@@ -133,14 +135,14 @@ public class CreateDeleteIT extends RyaExportITBase {
final String pcjId = ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql, Sets.newHashSet());
// Write the data to Rya.
- final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection();
+ final SailRepositoryConnection ryaConn = getRyaSailRepository().getConnection();
ryaConn.begin();
ryaConn.add(statements);
ryaConn.commit();
ryaConn.close();
// Wait for the Fluo application to finish computing the end result.
- super.getMiniFluo().waitForObservers();
+ getMiniFluo().waitForObservers();
// The PCJ Id is the topic name the results will be written to.
return pcjId;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java
index e61104a..eb21b34 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java
@@ -109,11 +109,11 @@ public class CreateDeletePeriodicPCJ extends KafkaExportITBase {
vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasId"), vf.createLiteral("id_4")));
- runTest(query, statements, 29);
+ runTest(query, statements, 30);
}
-
+
private void runTest(String query, Collection<Statement> statements, int expectedEntries) throws Exception {
try (FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
@@ -134,10 +134,11 @@ public class CreateDeletePeriodicPCJ extends KafkaExportITBase {
DeletePeriodicQuery deletePeriodic = new DeletePeriodicQuery(fluoClient, storage);
deletePeriodic.deletePeriodicQuery(FluoQueryUtils.convertFluoQueryIdToPcjId(id), notificationClient);
+ getMiniFluo().waitForObservers();
// Ensure all data related to the query has been removed.
final List<Bytes> empty_rows = getFluoTableEntries(fluoClient);
- assertEquals(0, empty_rows.size());
+ assertEquals(1, empty_rows.size());
// Ensure that Periodic Service notified to add and delete PeriodicNotification
Set<CommandNotification> notifications;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java
deleted file mode 100644
index fabf512..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java
+++ /dev/null
@@ -1,169 +0,0 @@
-package org.apache.rya.indexing.pcj.fluo.integration;
-
-import static java.util.Objects.requireNonNull;
-
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.xml.datatype.DatatypeConfigurationException;
-import javax.xml.datatype.DatatypeFactory;
-
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.client.Transaction;
-import org.apache.fluo.api.client.scanner.CellScanner;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.data.RowColumnValue;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
-import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
-import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.openrdf.model.Literal;
-import org.openrdf.model.Statement;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.repository.sail.SailRepositoryConnection;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-public class FluoLatencyIT extends KafkaExportITBase {
- private static ValueFactory vf;
- private static DatatypeFactory dtf;
-
- @BeforeClass
- public static void init() throws DatatypeConfigurationException {
- vf = new ValueFactoryImpl();
- dtf = DatatypeFactory.newInstance();
- }
-
- @Test
- public void resultsExported() throws Exception {
-
- final String sparql = "prefix time: <http://www.w3.org/2006/time#> " + "select ?type (count(?obs) as ?total) where { "
- + " ?obs <uri:hasTime> ?time. " + " ?obs <uri:hasObsType> ?type " + "} " + "group by ?type";
-
-// final String sparql = "prefix time: <http://www.w3.org/2006/time#> " + "select ?type ?obs where { "
-// + " ?obs <uri:hasTime> ?time. " + " ?obs <uri:hasObsType> ?type " + "}";
-
- try (FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
- // Tell the Fluo app to maintain the PCJ.
- String pcjId = FluoQueryUtils.createNewPcjId();
- FluoConfiguration conf = super.getFluoConfiguration();
- new CreateFluoPcj().createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.KAFKA), fluoClient);
- SailRepositoryConnection conn = super.getRyaSailRepository().getConnection();
-
- long start = System.currentTimeMillis();
- int numReturned = 0;
- int numObs = 10;
- int numTypes = 5;
- int numExpected = 0;
- int increment = numObs*numTypes;
- while (System.currentTimeMillis() - start < 60000) {
- List<Statement> statements = generate(10, 5, "car_", numExpected, ZonedDateTime.now());
- conn.add(statements);
- numExpected += increment;
- System.out.println("Num Accumulo Entries: " + getNumAccEntries(conf.getAccumuloTable()) + " Num Fluo Entries: "
- + getNumFluoEntries(fluoClient));
- numReturned += readAllResults(pcjId).size();
- System.out
- .println("Expected: " + numExpected + " NumReturned: " + numReturned + " Difference: " + (numExpected - numReturned));
-// FluoITHelper.printFluoTable(conf);
- Thread.sleep(30000);
- }
- }
- }
-
- /**
- * Generates (numObservationsPerType x numTypes) statements of the form:
- *
- * <pre>
- * urn:obs_n uri:hasTime zonedTime
- * urn:obs_n uri:hasObsType typePrefix_m
- * </pre>
- *
- * Where the n in urn:obs_n is the ith value in 0 to (numObservationsPerType x numTypes) with an offset specified by
- * observationOffset, and where m in typePrefix_m is the jth value in 0 to numTypes.
- *
- * @param numObservationsPerType - The quantity of observations per type to generate.
- * @param numTypes - The number of types to generate observations for.
- * @param typePrefix - The prefix to be used for the type literal in the statement.
- * @param observationOffset - The offset to be used for determining the value of n in the above statements.
- * @param zonedTime - The time to be used for all observations generated.
- * @return A new list of all generated Statements.
- */
- public List<Statement> generate(final long numObservationsPerType, final int numTypes, final String typePrefix,
- final long observationOffset, final ZonedDateTime zonedTime) {
- final String time = zonedTime.format(DateTimeFormatter.ISO_INSTANT);
- final Literal litTime = vf.createLiteral(dtf.newXMLGregorianCalendar(time));
- final List<Statement> statements = Lists.newArrayList();
-
- for (long i = 0; i < numObservationsPerType; i++) {
- for (int j = 0; j < numTypes; j++) {
- final long observationId = observationOffset + i * numTypes + j;
- // final String obsId = "urn:obs_" + Long.toHexString(observationId) + "_" + observationId;
- // final String obsId = "urn:obs_" + observationId;
- final String obsId = "urn:obs_" + String.format("%020d", observationId);
- final String type = typePrefix + j;
- // logger.info(obsId + " " + type + " " + litTime);
- statements.add(vf.createStatement(vf.createURI(obsId), vf.createURI("uri:hasTime"), litTime));
- statements.add(vf.createStatement(vf.createURI(obsId), vf.createURI("uri:hasObsType"), vf.createLiteral(type)));
- }
- }
-
- return statements;
- }
-
- private Set<VisibilityBindingSet> readAllResults(final String pcjId) throws Exception {
- requireNonNull(pcjId);
-
- // Read all of the results from the Kafka topic.
- final Set<VisibilityBindingSet> results = new HashSet<>();
-
- try (final KafkaConsumer<String, VisibilityBindingSet> consumer = makeConsumer(pcjId)) {
- final ConsumerRecords<String, VisibilityBindingSet> records = consumer.poll(5000);
- final Iterator<ConsumerRecord<String, VisibilityBindingSet>> recordIterator = records.iterator();
- while (recordIterator.hasNext()) {
- results.add(recordIterator.next().value());
- }
- }
-
- return results;
- }
-
- private int getNumAccEntries(String tableName) throws TableNotFoundException {
- Scanner scanner = super.getAccumuloConnector().createScanner(tableName, new Authorizations());
- int count = 0;
- for (Map.Entry<Key, Value> entry : scanner) {
- count++;
- }
- return count;
- }
-
- private int getNumFluoEntries(FluoClient client) {
- Transaction tx = client.newTransaction();
- CellScanner scanner = tx.scanner().build();
- int count = 0;
- for (RowColumnValue rcv : scanner) {
- count++;
- }
- return count;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
index beaef32..2b87a97 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
@@ -123,9 +123,8 @@ public class QueryIT extends RyaExportITBase {
// and are skilled with computers. The resulting binding set includes everybody who
// was involved in the recruitment process.
final String sparql = "SELECT ?recruiter ?candidate ?leader " + "{ " + "?recruiter <http://recruiterFor> <http://GeekSquad>. "
- + "?candidate <http://skilledWith> <http://Computers>. " + "?candidate <http://livesIn> \"USA\". "
- + "?leader <http://leaderOf> <http://GeekSquad>. " + "?recruiter <http://talksTo> ?candidate. "
- + "?candidate <http://talksTo> ?leader. " + "}";
+ + "?recruiter <http://talksTo> ?candidate. " + "?candidate <http://skilledWith> <http://Computers>. " + "?candidate <http://livesIn> \"USA\". "
+ + "?candidate <http://talksTo> ?leader." + "?leader <http://leaderOf> <http://GeekSquad>. }";
// Create the Statements that will be loaded into Rya.
final ValueFactory vf = new ValueFactoryImpl();
@@ -426,10 +425,10 @@ public class QueryIT extends RyaExportITBase {
runTest(sparql, statements, expectedResults, ExportStrategy.RYA);
}
-
+
@Test
public void dateTimeWithin() throws Exception {
-
+
final ValueFactory vf = new ValueFactoryImpl();
DatatypeFactory dtf = DatatypeFactory.newInstance();
FunctionRegistry.getInstance().add(new DateTimeWithinPeriod());
@@ -437,13 +436,13 @@ public class QueryIT extends RyaExportITBase {
final String sparql = "PREFIX fn: <" + FN.NAMESPACE +">"
+ "SELECT ?event ?startTime ?endTime WHERE { ?event <uri:startTime> ?startTime; <uri:endTime> ?endTime. "
+ "FILTER(fn:dateTimeWithin(?startTime, ?endTime, 2,<" + OWLTime.HOURS_URI + "> ))}";
-
+
ZonedDateTime zTime = ZonedDateTime.now();
String time = zTime.format(DateTimeFormatter.ISO_INSTANT);
ZonedDateTime zTime1 = zTime.minusHours(1);
String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
-
+
ZonedDateTime zTime2 = zTime.minusHours(2);
String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
@@ -471,10 +470,10 @@ public class QueryIT extends RyaExportITBase {
// Verify the end results of the query match the expected results.
runTest(sparql, statements, expectedResults, ExportStrategy.RYA);
}
-
+
@Test
public void dateTimeWithinNow() throws Exception {
-
+
final ValueFactory vf = new ValueFactoryImpl();
DatatypeFactory dtf = DatatypeFactory.newInstance();
FunctionRegistry.getInstance().add(new DateTimeWithinPeriod());
@@ -482,13 +481,13 @@ public class QueryIT extends RyaExportITBase {
final String sparql = "PREFIX fn: <" + FN.NAMESPACE +">"
+ "SELECT ?event ?startTime WHERE { ?event <uri:startTime> ?startTime. "
+ "FILTER(fn:dateTimeWithin(?startTime, NOW(), 15, <" + OWLTime.SECONDS_URI + "> ))}";
-
+
ZonedDateTime zTime = ZonedDateTime.now();
String time = zTime.format(DateTimeFormatter.ISO_INSTANT);
ZonedDateTime zTime1 = zTime.minusSeconds(30);
String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
-
+
Literal lit = vf.createLiteral(dtf.newXMLGregorianCalendar(time));
Literal lit1 = vf.createLiteral(dtf.newXMLGregorianCalendar(time1));
@@ -511,7 +510,7 @@ public class QueryIT extends RyaExportITBase {
}
-
+
@Test
public void periodicQueryTestWithoutAggregation() throws Exception {
String query = "prefix function: <http://org.apache.rya/function#> " // n
@@ -800,8 +799,8 @@ public class QueryIT extends RyaExportITBase {
// Verify the end results of the query match the expected results.
runTest(query, statements, expectedResults, ExportStrategy.PERIODIC);
}
-
-
+
+
@Test
public void nestedPeriodicQueryTestWithAggregationAndGroupBy() throws Exception {
String query = "prefix function: <http://org.apache.rya/function#> " // n
@@ -879,7 +878,7 @@ public class QueryIT extends RyaExportITBase {
// Verify the end results of the query match the expected results.
runTest(query, statements, expectedResults, ExportStrategy.PERIODIC);
}
-
+
@Test
public void nestedJoinPeriodicQueryWithAggregationAndGroupBy() throws Exception {
String query = "prefix function: <http://org.apache.rya/function#> " // n
@@ -1006,6 +1005,7 @@ public class QueryIT extends RyaExportITBase {
// Ensure the result of the query matches the expected result.
assertEquals(expectedResults, results);
}
+
break;
case PERIODIC:
PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(accumuloConn, getRyaInstanceName());
@@ -1014,7 +1014,7 @@ public class QueryIT extends RyaExportITBase {
new CreateFluoPcj().createPcj(periodicId, sparql, Sets.newHashSet(ExportStrategy.PERIODIC), fluo);
}
addStatementsAndWait(statements);
-
+
final Set<BindingSet> results = Sets.newHashSet();
try (CloseableIterator<BindingSet> resultIter = periodicStorage.listResults(periodicId, Optional.empty())) {
while (resultIter.hasNext()) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StatementPatternIdCacheIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StatementPatternIdCacheIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StatementPatternIdCacheIT.java
new file mode 100644
index 0000000..91c9977
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StatementPatternIdCacheIT.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.integration;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdCache;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+public class StatementPatternIdCacheIT extends RyaExportITBase {
+
+ /**
+ * Ensure streamed matches are included in the result.
+ */
+ @Test
+ public void statementPatternIdCacheTest() throws Exception {
+ // A query that finds people who talk to Eve and work at Chipotle.
+ final String sparql1 =
+ "SELECT ?x WHERE { " +
+ "?x <urn:pred1> <urn:obj1>. " +
+ "?x <urn:pred2> <urn:obj2>." +
+ "}";
+
+ final String sparql2 =
+ "SELECT ?x WHERE { " +
+ "?x <urn:pred3> <urn:obj3>. " +
+ "?x <urn:pred4> <urn:obj4>." +
+ "}";
+
+ try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+
+ String pcjId = FluoQueryUtils.createNewPcjId();
+ // Tell the Fluo app to maintain the PCJ.
+ FluoQuery query1 = new CreateFluoPcj().createPcj(pcjId, sparql1, new HashSet<>(), fluoClient);
+ Set<String> spIds1 = new HashSet<>();
+ for(StatementPatternMetadata metadata: query1.getStatementPatternMetadata()) {
+ spIds1.add(metadata.getNodeId());
+ }
+
+ StatementPatternIdCache cache = new StatementPatternIdCache();
+
+ assertEquals(spIds1, cache.getStatementPatternIds(fluoClient.newTransaction()));
+
+ FluoQuery query2 = new CreateFluoPcj().createPcj(pcjId, sparql2, new HashSet<>(), fluoClient);
+ Set<String> spIds2 = new HashSet<>();
+ for(StatementPatternMetadata metadata: query2.getStatementPatternMetadata()) {
+ spIds2.add(metadata.getNodeId());
+ }
+
+ assertEquals(Sets.union(spIds1, spIds2), cache.getStatementPatternIds(fluoClient.newTransaction()));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/resources/log4j.properties b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/resources/log4j.properties
new file mode 100644
index 0000000..a5086ee
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/resources/log4j.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Valid levels:
+# TRACE, DEBUG, INFO, WARN, ERROR and FATAL
+log4j.rootLogger=INFO, CONSOLE
+
+# Set independent logging levels
+log4j.logger.org.apache.zookeeper=WARN
+log4j.logger.kafka=WARN
+log4j.logger.org.apache.kafka=WARN
+#log4j.logger.org.apache.rya.indexing.pcj.fluo=DEBUG
+
+# LOGFILE is set to be a File appender using a PatternLayout.
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+#log4j.appender.CONSOLE.Threshold=DEBUG
+
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
+
+#log4j.appender.default.file=org.apache.log4j.FileAppender
+#log4j.appender.default.file.file=/home/cloudera/Desktop/log.out
+#log4j.appender.default.file.layout=org.apache.log4j.PatternLayout
+#log4j.appender.default.file.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
+
+#log4j.appender.CONSOLE.layout=org.apache.log4j.EnhancedPatternLayout
+#log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p %c{1.} - %m%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java
index 32ee962..48334d0 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java
@@ -48,41 +48,42 @@ import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.rya.accumulo.MiniAccumuloClusterInstance;
-import org.apache.rya.accumulo.MiniAccumuloSingleton;
-import org.apache.rya.accumulo.RyaTestInstanceRule;
-import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
-import org.apache.rya.api.client.accumulo.AccumuloInstall;
-import org.apache.zookeeper.ClientCnxn;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.openrdf.repository.RepositoryConnection;
-import org.openrdf.repository.RepositoryException;
-import org.openrdf.sail.Sail;
-import org.openrdf.sail.SailException;
-
import org.apache.fluo.api.client.FluoAdmin;
import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.mini.MiniFluo;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.accumulo.MiniAccumuloClusterInstance;
+import org.apache.rya.accumulo.MiniAccumuloSingleton;
+import org.apache.rya.accumulo.RyaTestInstanceRule;
import org.apache.rya.api.client.Install;
import org.apache.rya.api.client.Install.DuplicateInstanceNameException;
import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloInstall;
import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.indexing.accumulo.ConfigUtils;
import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
+import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdCacheSupplier;
import org.apache.rya.rdftriplestore.RyaSailRepository;
import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
import org.apache.rya.sail.config.RyaSailFactory;
+import org.apache.zookeeper.ClientCnxn;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.openrdf.repository.RepositoryConnection;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailException;
/**
* Integration tests that ensure the Fluo application processes PCJs results
@@ -156,6 +157,7 @@ public abstract class FluoITBase {
} catch (final Exception e) {
log.error("Could not shut down the Rya Connection.", e);
}
+
}
if (ryaRepo != null) {
@@ -187,6 +189,9 @@ public abstract class FluoITBase {
log.error("Could not shut down the Mini Fluo.", e);
}
}
+
+ StatementPatternIdCacheSupplier.clear();
+ MetadataCacheSupplier.clear();
}
protected void preFluoInitHook() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
index 7b16dcf..ca17b2a 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
@@ -53,6 +53,7 @@ import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
import org.apache.rya.indexing.accumulo.ConfigUtils;
import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchObserver;
import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterParameters;
import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaSubGraphExporterParameters;
import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer;
@@ -64,11 +65,14 @@ import org.apache.rya.indexing.pcj.fluo.app.observers.ProjectionObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
+import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdCacheSupplier;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.apache.rya.rdftriplestore.RyaSailRepository;
import org.apache.rya.sail.config.RyaSailFactory;
import org.junit.After;
import org.junit.Before;
+import org.junit.Test;
import org.openrdf.model.Statement;
import org.openrdf.repository.sail.SailRepositoryConnection;
import org.openrdf.sail.Sail;
@@ -115,6 +119,7 @@ public class KafkaExportITBase extends AccumuloExportITBase {
// Setup the observers that will be used by the Fluo PCJ Application.
final List<ObserverSpecification> observers = new ArrayList<>();
observers.add(new ObserverSpecification(TripleObserver.class.getName()));
+ observers.add(new ObserverSpecification(BatchObserver.class.getName()));
observers.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
observers.add(new ObserverSpecification(JoinObserver.class.getName()));
observers.add(new ObserverSpecification(FilterObserver.class.getName()));
@@ -185,6 +190,12 @@ public class KafkaExportITBase extends AccumuloExportITBase {
}
}
+ @After
+ public void clearCaches() {
+ StatementPatternIdCacheSupplier.clear();
+ MetadataCacheSupplier.clear();
+ }
+
private void installRyaInstance() throws Exception {
final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster();
final String instanceName = cluster.getInstanceName();
@@ -261,7 +272,7 @@ public class KafkaExportITBase extends AccumuloExportITBase {
* If this test fails then its a testing environment issue, not with Rya.
* Source: https://github.com/asmaier/mini-kafka
*/
-// @Test
+ @Test
public void embeddedKafkaTest() throws Exception {
// create topic
final String topic = "testTopic";
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java
index 970cfd8..f540a2e 100644
--- a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java
+++ b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java
@@ -132,6 +132,8 @@ public class GeoFunctionsIT extends RyaExportITBase {
"PREFIX uom: <http://www.opengis.net/def/uom/OGC/1.0/> " +
"SELECT ?cityA ?cityB " +
"WHERE { " +
+ "?cityA <urn:containedIn> ?continent. " +
+ "?cityB <urn:containedIn> ?continent. " +
"?cityA geo:asWKT ?coord1 . " +
"?cityB geo:asWKT ?coord2 . " +
// from brussels 173km to amsterdam
@@ -147,9 +149,13 @@ public class GeoFunctionsIT extends RyaExportITBase {
vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#dakar2"), asWKT, vf.createLiteral("Point(-17.45 14.69)", wktTypeUri)),
vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#canberra"), asWKT, vf.createLiteral("Point(149.12 -35.31)", wktTypeUri)),
vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#brussels"), asWKT, vf.createLiteral("Point(4.35 50.85)", wktTypeUri)),
- vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam"), asWKT, vf.createLiteral("Point(4.9 52.37)", wktTypeUri)));
+ vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam"), asWKT, vf.createLiteral("Point(4.9 52.37)", wktTypeUri)),
+ vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam"), vf.createURI("urn:containedIn"), vf.createLiteral("Europe")),
+ vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#dakar"), vf.createURI("urn:containedIn"), vf.createLiteral("Africa")),
+ vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#dakar2"), vf.createURI("urn:containedIn"), vf.createLiteral("Africa")),
+ vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#brussels"), vf.createURI("urn:containedIn"), vf.createLiteral("Europe")));
- // The expected results of the SPARQL query once the PCJ has been computed.
+ // The expected results of the SPARQL query once the PCJ has been computed.l
final Set<BindingSet> expectedResults = new HashSet<>();
MapBindingSet bs = new MapBindingSet();
@@ -234,6 +240,8 @@ public class GeoFunctionsIT extends RyaExportITBase {
"PREFIX geof: <http://www.opengis.net/def/function/geosparql/> " +
"PREFIX uom: <http://www.opengis.net/def/uom/OGC/1.0/> " +
"SELECT ?cityA ?cityB { " +
+ "?cityA <urn:containedIn> ?continent. " +
+ "?cityB <urn:containedIn> ?continent. " +
"?cityA geo:asWKT ?coord1 . " +
"?cityB geo:asWKT ?coord2 . " +
" FILTER ( geof:sfIntersects(?coord1, ?coord2) ) " +
@@ -248,6 +256,8 @@ public class GeoFunctionsIT extends RyaExportITBase {
vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#canberra"), asWKT, vf.createLiteral("Point(149.12 -35.31)", wktTypeUri)),
vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#brussels"), asWKT, vf.createLiteral("Point(4.35 50.85)", wktTypeUri)),
vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam"), asWKT, vf.createLiteral("Point(4.9 52.37)", wktTypeUri)),
+ vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam"), vf.createURI("urn:containedIn"), vf.createLiteral("Europe")),
+ vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam2"), vf.createURI("urn:containedIn"), vf.createLiteral("Europe")),
vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam2"), asWKT, vf.createLiteral("Point(4.9 52.37)", wktTypeUri)));
// The expected results of the SPARQL query once the PCJ has been computed.