You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2016/04/26 17:26:09 UTC
gora git commit: GORA-471 Datastore for Infinispan
Repository: gora
Updated Branches:
refs/heads/master 96e623b03 -> 8267561e5
GORA-471 Datastore for Infinispan
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/8267561e
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/8267561e
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/8267561e
Branch: refs/heads/master
Commit: 8267561e52a02bf9faf70a24484aac105cdcc9d0
Parents: 96e623b
Author: Lewis John McGibbney <le...@jpl.nasa.gov>
Authored: Fri Mar 25 18:42:02 2016 -0700
Committer: Lewis John McGibbney <le...@jpl.nasa.gov>
Committed: Fri Mar 25 18:42:02 2016 -0700
----------------------------------------------------------------------
gora-infinispan/.gitignore | 1 +
gora-infinispan/LICENSE | 202 +++++++++++
gora-infinispan/README.md | 69 ++++
gora-infinispan/pom.xml | 217 +++++++++++
.../gora/infinispan/query/InfinispanQuery.java | 361 +++++++++++++++++++
.../gora/infinispan/query/InfinispanResult.java | 81 +++++
.../gora/infinispan/store/InfinispanClient.java | 169 +++++++++
.../gora/infinispan/store/InfinispanStore.java | 297 +++++++++++++++
.../infinispan/GoraInfinispanTestDriver.java | 109 ++++++
.../gora/infinispan/SimulationDriver.java | 156 ++++++++
.../java/org/apache/gora/infinispan/Utils.java | 50 +++
.../mapreduce/InfinispanStoreMapReduceTest.java | 79 ++++
.../infinispan/store/InfinispanStoreTest.java | 174 +++++++++
.../src/test/resources/gora.properties | 19 +
.../src/test/resources/log4j.properties | 51 +++
.../src/test/resources/simplelogger.properties | 17 +
pom.xml | 1 +
17 files changed, 2053 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/8267561e/gora-infinispan/.gitignore
----------------------------------------------------------------------
diff --git a/gora-infinispan/.gitignore b/gora-infinispan/.gitignore
new file mode 100644
index 0000000..b83d222
--- /dev/null
+++ b/gora-infinispan/.gitignore
@@ -0,0 +1 @@
+/target/
http://git-wip-us.apache.org/repos/asf/gora/blob/8267561e/gora-infinispan/LICENSE
----------------------------------------------------------------------
diff --git a/gora-infinispan/LICENSE b/gora-infinispan/LICENSE
new file mode 100644
index 0000000..8f71f43
--- /dev/null
+++ b/gora-infinispan/LICENSE
@@ -0,0 +1,202 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "{}"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright {yyyy} {name of copyright owner}
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
http://git-wip-us.apache.org/repos/asf/gora/blob/8267561e/gora-infinispan/README.md
----------------------------------------------------------------------
diff --git a/gora-infinispan/README.md b/gora-infinispan/README.md
new file mode 100644
index 0000000..dfc872d
--- /dev/null
+++ b/gora-infinispan/README.md
@@ -0,0 +1,69 @@
+# gora-infinispan
+
+### Description
+
+The Apache Gora open source framework provides an in-memory data model and persistence for big data. Gora supports persisting to column stores, key value stores, document stores and RDBMSs, and analyzing data with extensive [Apache Hadoop](https://hadoop.apache.org/) [MapReduce](https://en.wikipedia.org/wiki/MapReduce) support.
+
+This project provides a Gora support for the [Infinispan](http://infinispan.org) storage system, allowing it to store data for Hadoop based applications, such as Apache [Nutch](http://nutch.apache.org/) or [Giraph](http://giraph.apache.org/).
+
+### Requirements
+
+[infinispan-avro-8.0.0.CR1](https://github.com/leads-project/infinispan-avro)
+
+### Installation
+
+This project is based upon Maven. It makes use of Infinispan 7.2.5.Final and the Avro support for Infinispan which is available [here](https://github.com/infinispan/infinispan). Below, we explain how to execute an installation.
+
+```
+# Building and installing infinispan-avro
+git clone https://github.com/leads-project/infinispan-avro.git
+cd infinispan-avro
+mvn clean install -DskipTests
+
+# Building and installing gora-infinispan
+git clone https://github.com/leads-project/gora-infinispan.git
+cd gora-infinispan
+mvn clean install -DskipTests
+```
+
+### Usage
+
+Gora allows a user application to store, retrieve and query Avro defined types. As of version 0.6, it offers [CRUD operations](http://gora.apache.org/current/api/apidocs-0.6/org/apache/gora/store/DataStore.html) and [query](http://gora.apache.org/current/api/apidocs-0.6/org/apache/gora/query/Query.html) that handle pagination, key range restriction, [filtering](http://gora.apache.org/current/api/apidocs-0.6/org/apache/gora/filter/Filter.html) and projection.
+
+The key interest of Gora is to offer a direct support for Hadoop to the data stores that implement its API. Under the hood, such a feature comes from a bridge between the [ImputFormat](http://gora.apache.org/current/api/apidocs-0.6/org/apache/gora/mapreduce/GoraInputFormat.html) and [OutputFormat](http://gora.apache.org/current/api/apidocs-0.6/org/apache/gora/mapreduce/GoraOutputFormat.html) classes and the [DataStore](http://gora.apache.org/current/api/apidocs-0.6/org/apache/gora/store/DataStore.html) class.
+
+This Infinispan support for Gora pass all the unit tests of the framework. All the querying operations are handled at the server side, and splitting a query allows to execute it at each of the Infinispan server, close to the data. Thanks to this last feature, map-reduce jobs that run atop of Infinisapn are locality-aware.
+
+## Code Sample
+
+In the samples below, we first duplicate a query across all the servers, then we execute two filtering operations.
+
+```java
+Utils.populateEmployeeStore(employeeStore, NEMPLOYEE);
+InfinispanQuery<String,Employee> query;
+
+// Partitioning
+int retrieved = 0;
+query = new InfinispanQuery<>(employeeDataStore);
+query.build();
+for (PartitionQuery<String,Employee> q : employeeDataStore.getPartitions(query)) {
+retrieved+=((InfinispanQuery<String,Employee>) q).list().size();
+}
+assert retrieved==NEMPLOYEE;
+
+// Test matching everything
+query = new InfinispanQuery<>(employeeDataStore);
+SingleFieldValueFilter filter = new SingleFieldValueFilter();
+filter.setFieldName("name");
+filter.setFilterOp(FilterOp.EQUALS);
+List<Object> operaands = new ArrayList<>();
+operaands.add("*");
+filter.setOperands(operaands);
+query.setFilter(filter);
+query.build();
+List<Employee> result = new ArrayList<>();
+for (PartitionQuery<String,Employee> q : employeeDataStore.getPartitions(query)) {
+result.addAll(((InfinispanQuery<String,Employee>)q).list());
+}
+assertEquals(NEMPLOYEE,result.size());
+```
http://git-wip-us.apache.org/repos/asf/gora/blob/8267561e/gora-infinispan/pom.xml
----------------------------------------------------------------------
diff --git a/gora-infinispan/pom.xml b/gora-infinispan/pom.xml
new file mode 100644
index 0000000..92a8b70
--- /dev/null
+++ b/gora-infinispan/pom.xml
@@ -0,0 +1,217 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.gora</groupId>
+ <artifactId>gora</artifactId>
+ <version>0.7-SNAPSHOT</version>
+ <relativePath>../</relativePath>
+ </parent>
+ <artifactId>gora-infinispan</artifactId>
+ <packaging>bundle</packaging>
+
+ <name>Apache Gora :: Infinispan</name>
+ <url>http://gora.apache.org</url>
+ <description>
+ The Apache Gora open source framework provides an in-memory data model and
+ persistence for big data. Gora supports persisting to column stores, key value stores,
+ document stores and RDBMSs, and analyzing the data with extensive Apache Hadoop MapReduce
+ support. This module provides a Gora support for the Infinispan storage system.
+ </description>
+ <inceptionYear>2015</inceptionYear>
+
+ <properties>
+ <infinispan.version>7.2.5.Final</infinispan.version>
+ </properties>
+
+ <build>
+ <directory>target</directory>
+ <outputDirectory>target/classes</outputDirectory>
+ <finalName>${project.artifactId}-${project.version}</finalName>
+ <testOutputDirectory>target/test-classes</testOutputDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>${build-helper-maven-plugin.version}</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/examples/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+
+ <!-- Gora Internal Dependencies -->
+ <dependency>
+ <groupId>org.apache.gora</groupId>
+ <artifactId>gora-core</artifactId>
+ </dependency>
+
+ <!-- Infinispan Dependencies -->
+ <dependency>
+ <groupId>org.infinispan</groupId>
+ <artifactId>infinispan-remote-query-client</artifactId>
+ <version>${infinispan.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.infinispan</groupId>
+ <artifactId>infinispan-avro-server</artifactId>
+ <version>${infinispan.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.infinispan</groupId>
+ <artifactId>infinispan-avro-hotrod</artifactId>
+ <version>${infinispan.version}</version>
+ </dependency>
+
+ <!-- Logging Dependencies -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </dependency>
+
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>org.apache.gora</groupId>
+ <artifactId>gora-core</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <version>${hadoop-2.test.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ <version>${hadoop-2.test.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ <version>${hadoop-2.test.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-tests</artifactId>
+ <type>test-jar</type>
+ <version>${hadoop-2.test.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-hs</artifactId>
+ <version>${hadoop-2.test.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.infinispan</groupId>
+ <artifactId>infinispan-core</artifactId>
+ <version>${infinispan.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.infinispan</groupId>
+ <artifactId>infinispan-client-hotrod</artifactId>
+ <version>${infinispan.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.infinispan</groupId>
+ <artifactId>infinispan-server-core</artifactId>
+ <version>${infinispan.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.infinispan</groupId>
+ <artifactId>infinispan-server-hotrod</artifactId>
+ <version>${infinispan.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.infinispan</groupId>
+ <artifactId>infinispan-server-hotrod</artifactId>
+ <version>${infinispan.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/gora/blob/8267561e/gora-infinispan/src/main/java/org/apache/gora/infinispan/query/InfinispanQuery.java
----------------------------------------------------------------------
diff --git a/gora-infinispan/src/main/java/org/apache/gora/infinispan/query/InfinispanQuery.java b/gora-infinispan/src/main/java/org/apache/gora/infinispan/query/InfinispanQuery.java
new file mode 100644
index 0000000..9e0b62f
--- /dev/null
+++ b/gora-infinispan/src/main/java/org/apache/gora/infinispan/query/InfinispanQuery.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.infinispan.query;
+
+import org.apache.avro.util.Utf8;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.gora.filter.MapFieldValueFilter;
+import org.apache.gora.filter.SingleFieldValueFilter;
+import org.apache.gora.infinispan.store.InfinispanStore;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.PartitionQuery;
+import org.apache.gora.query.impl.QueryBase;
+import org.apache.hadoop.io.WritableUtils;
+import org.infinispan.avro.client.Support;
+import org.infinispan.avro.hotrod.QueryBuilder;
+import org.infinispan.avro.hotrod.RemoteQuery;
+import org.infinispan.query.dsl.FilterConditionContext;
+import org.infinispan.query.dsl.SortOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+/*
+ * * @author Pierre Sutra
+ */
+public class InfinispanQuery<K,T extends PersistentBase>
+extends QueryBase<K,T>
+implements PartitionQuery<K,T>, Cloneable{
+
+ public static final Logger LOG = LoggerFactory.getLogger(InfinispanQuery.class);
+ private static final String ADDR_DELIMITATOR = ";";
+
+ private RemoteQuery q;
+ private InetSocketAddress location;
+ private long offset;
+
+ protected String sortingField;
+ protected boolean isAscendant;
+
+ public InfinispanQuery(){
+ super(null);
+ this.localFilterEnabled = false;
+ this.setOffset(-1);
+ this.isAscendant = true;
+ this.sortingField = "";
+ }
+
+ public InfinispanQuery(InfinispanStore<K, T> dataStore) {
+ super(dataStore);
+ this.localFilterEnabled = false;
+ this.setOffset(-1);
+ this.isAscendant = true;
+ this.sortingField = "";
+ }
+
+ public boolean isBuilt() {
+ LOG.debug("isBuilt()");
+ return q!=null;
+ }
+
+ public void rebuild(){
+ LOG.debug("rebuild()");
+ q=null;
+ build();
+ }
+
+ public void build(){
+ LOG.debug("build()");
+
+ FilterConditionContext context = null;
+
+ if(q!=null) {
+ LOG.trace("Query already built; ignoring.");
+ return;
+ }
+
+ QueryBuilder qb = ((InfinispanStore<K,T>)dataStore).getClient().getQueryBuilder();
+
+ if (filter instanceof MapFieldValueFilter){
+ MapFieldValueFilter mfilter = (MapFieldValueFilter) filter;
+ if (!(mfilter.getMapKey() instanceof Utf8))
+ throw new IllegalAccessError("Invalid map key, must be a string.");
+ if (mfilter.getOperands().size()>1)
+ throw new IllegalAccessError("MapFieldValueFilter operand not supported.");
+ if (!(mfilter.getOperands().get(0) instanceof String))
+ throw new IllegalAccessError("Invalid operand, must be a string.");
+ String value = mfilter.getMapKey()+ Support.DELIMITER+mfilter.getOperands().get(0).toString();
+ switch (mfilter.getFilterOp()) {
+ case EQUALS:
+ if (value.equals("*")) {
+ context = qb.having(mfilter.getFieldName()).like(value);
+ } else {
+ context = qb.having(mfilter.getFieldName()).eq(value);
+ }
+ if (!((MapFieldValueFilter) filter).isFilterIfMissing()) {
+ LOG.warn("Forcing isFilterMissing to true");
+ ((MapFieldValueFilter) filter).setFilterIfMissing(true);
+ }
+ break;
+ case NOT_EQUALS:
+ if (value.equals("*")) {
+ context = qb.not().having(mfilter.getFieldName()).like(value);
+ } else {
+ context = qb.not().having(mfilter.getFieldName()).eq(value);
+ }
+ if (!((MapFieldValueFilter) filter).isFilterIfMissing()) {
+ LOG.warn("Forcing isFilterMissing to false");
+ ((MapFieldValueFilter) filter).setFilterIfMissing(false);
+ }
+ break;
+ default:
+ throw new IllegalAccessError("FilterOp not supported..");
+ }
+
+ } else if (filter instanceof SingleFieldValueFilter){
+ SingleFieldValueFilter sfilter = (SingleFieldValueFilter) filter;
+ if (sfilter.getOperands().size()>1)
+ throw new IllegalAccessError("SingleFieldValueFilter operand not supported.");
+ Object value = sfilter.getOperands().get(0);
+ switch (sfilter.getFilterOp()) {
+ case EQUALS:
+ if (value.equals("*")) {
+ context = qb.having(sfilter.getFieldName()).like((String)value);
+ } else {
+ context = qb.having(sfilter.getFieldName()).eq(value);
+ }
+ break;
+ case NOT_EQUALS:
+ if (value.equals("*")) {
+ context = qb.not().having(sfilter.getFieldName()).like((String)value);
+ } else {
+ context = qb.not().having(sfilter.getFieldName()).eq(value);
+ }
+ break;
+ case LESS:
+ context = qb.having(sfilter.getFieldName()).lt(value);
+ break;
+ case LESS_OR_EQUAL:
+ context = qb.having(sfilter.getFieldName()).lte(value);
+ break;
+ case GREATER:
+ context = qb.having(sfilter.getFieldName()).gt(value);
+ break;
+ case GREATER_OR_EQUAL:
+ context = qb.having(sfilter.getFieldName()).gte(value);
+ break;
+ default:
+ throw new IllegalAccessError("FilterOp not supported..");
+ }
+
+ } else if (filter!=null) {
+ throw new IllegalAccessError("Filter not supported.");
+ }
+
+ if (this.startKey==this.endKey && this.startKey != null ){
+ (context == null ? qb : context.and()).having(getPrimaryFieldName()).eq(this.startKey);
+ }else{
+ if (this.startKey!=null && this.endKey!=null)
+ context = (context == null ? qb : context.and()).having(getPrimaryFieldName()).between(this.startKey,this.endKey);
+ else if (this.startKey!=null)
+ context = (context == null ? qb : context.and()).having(getPrimaryFieldName()).between(this.startKey,null);
+ else if (this.endKey!=null)
+ (context == null ? qb : context.and()).having(getPrimaryFieldName()).between(null,this.endKey);
+ }
+
+ // if projection enabled, keep the primary field.
+ if (fields!=null && fields.length > 0) {
+ String[] fieldsWithPrimary;
+ List<String> fieldsList = new ArrayList<>(Arrays.asList(fields));
+ if (!fieldsList.contains(getPrimaryFieldName())) {
+ fieldsWithPrimary = Arrays.copyOf(fields, fields.length + 1);
+ fieldsWithPrimary[fields.length] = getPrimaryFieldName();
+ }else{
+ fieldsWithPrimary = fieldsList.toArray(new String[]{});
+ }
+ qb.setProjection(fieldsWithPrimary);
+ }
+
+ qb.orderBy(
+ (getSortingField().equals("")) ? getPrimaryFieldName() : getSortingField(),
+ isAscendant ? SortOrder.ASC : SortOrder.DESC);
+
+ if (this.getOffset()>=0)
+ qb.startOffset(this.getOffset());
+
+ if (this.getLimit()>0)
+ qb.maxResults((int) this.getLimit());
+
+ q = (RemoteQuery) qb.build();
+
+ if (location!=null)
+ q.setLocation(location);
+ }
+
+ public List<T> list(){
+ LOG.debug("list()");
+ if (!isBuilt()) build();
+ return q.list();
+
+ }
+
+ public List<PartitionQuery<K,T>> split() {
+ LOG.debug("split()");
+ if(!isBuilt()) build();
+ List<PartitionQuery<K,T>> splits = new ArrayList<>();
+ QueryBuilder qb = ((InfinispanStore<K,T>)dataStore).getClient().getQueryBuilder();
+ Collection<RemoteQuery> Queries = qb.split(this.q);
+ for (RemoteQuery Query : Queries) {
+ InfinispanQuery<K,T> split = (InfinispanQuery<K, T>) this.clone();
+ split.q = Query;
+ split.location = Query.getLocation();
+ splits.add(split);
+ }
+ LOG.trace(splits.toString());
+ return splits;
+ }
+
+ public int getResultSize(){
+ return q.getResultSize();
+ }
+
+ public String getPrimaryFieldName(){
+ return ((InfinispanStore)dataStore).getPrimaryFieldName();
+ }
+
+ @Override
+ public Object clone() {
+ InfinispanQuery<K,T> query = null;
+ try {
+ query = (InfinispanQuery<K, T>) super.clone();
+ } catch (CloneNotSupportedException e) {
+ // not reachable.
+ }
+ query.setDataStore(this.getDataStore());
+ query.setFilter(this.getFilter());
+ query.setFields(this.getFields());
+ query.setKeyRange(this.getStartKey(), this.getEndKey());
+ query.setConf(this.getConf());
+ query.setStartTime(this.getStartTime());
+ query.setEndTime(this.getEndTime());
+ query.setLocalFilterEnabled(this.isLocalFilterEnabled());
+ query.setLimit(this.getLimit());
+ query.setOffset(this.getOffset());
+ query.setQueryString(this.getQueryString());
+ query.setSortingField(this.getSortingField());
+ query.setSortingOrder(this.getSortingOrder());
+ query.q = this.q;
+ query.location = this.location;
+ return query;
+ }
+
+ @Override
+ public String[] getLocations() {
+ if (location==null)
+ return new String[0];
+ String[] result = new String[1];
+ result[0] = location.getHostString();
+ return result;
+ }
+
+ public InetSocketAddress getLocation(){
+ return location;
+ }
+
+ // FIXME use the write non-null fields function.
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ sortingField = WritableUtils.readString(in);
+ isAscendant = in.readBoolean();
+ String locationString = WritableUtils.readString(in);
+ if (!locationString.equals(""))
+ location = new InetSocketAddress(
+ locationString.split(ADDR_DELIMITATOR)[0],
+ Integer.valueOf(locationString.split(ADDR_DELIMITATOR)[1]));
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ WritableUtils.writeString(out, getSortingField());
+ out.writeBoolean(isAscendant);
+ if (location!=null)
+ WritableUtils.writeString(out,location.getHostName()+ADDR_DELIMITATOR+location.getPort());
+ else
+ WritableUtils.writeString(out,"");
+ }
+
+ @Override
+ public String toString() {
+ ToStringBuilder builder = new ToStringBuilder(this);
+ builder.append("dataStore", dataStore);
+ builder.append("location", location==null ? null :location.toString());
+ builder.append("fields", fields);
+ builder.append("startKey", startKey);
+ builder.append("endKey", endKey);
+ builder.append("filter", filter);
+ builder.append("limit", limit);
+ builder.append("offset", offset);
+ builder.append("localFilterEnabled", localFilterEnabled);
+ return builder.toString();
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
+
+ public String getQueryString() {
+ return queryString;
+ }
+
+ public void setQueryString(String queryString) {
+ this.queryString = queryString;
+ }
+
+ public String getSortingField() {
+ return sortingField;
+ }
+
+ public void setSortingField(String field) {
+ sortingField = field;
+ }
+
+ public boolean getSortingOrder() {
+ return isAscendant;
+ }
+
+ public void setSortingOrder(boolean isAscendant) {
+ this.isAscendant = isAscendant;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/gora/blob/8267561e/gora-infinispan/src/main/java/org/apache/gora/infinispan/query/InfinispanResult.java
----------------------------------------------------------------------
diff --git a/gora-infinispan/src/main/java/org/apache/gora/infinispan/query/InfinispanResult.java b/gora-infinispan/src/main/java/org/apache/gora/infinispan/query/InfinispanResult.java
new file mode 100644
index 0000000..3429a27
--- /dev/null
+++ b/gora-infinispan/src/main/java/org/apache/gora/infinispan/query/InfinispanResult.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.infinispan.query;
+
+import org.apache.gora.infinispan.store.InfinispanStore;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.impl.ResultBase;
+import org.apache.gora.store.DataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/*
+ * @author Pierre Sutra, Valerio Schiavoni
+ */
+public class InfinispanResult<K, T extends PersistentBase> extends ResultBase<K, T> {
+
+ public static final Logger LOG = LoggerFactory.getLogger(InfinispanResult.class);
+
+ private List<T> list;
+ private int current;
+ private int primaryFieldPos;
+
+ public InfinispanResult(DataStore<K, T> dataStore, InfinispanQuery<K, T> query) {
+ super(dataStore, query);
+ list = query.list();
+ current = 0;
+ primaryFieldPos = ((InfinispanStore<K,T>)dataStore).getPrimaryFieldPos();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ LOG.debug("getProgress()");
+ if (list.size()==0) return 1;
+ float progress = ((float)current/(float)list.size());
+ LOG.trace("progress: "+progress);
+ return progress;
+ }
+
+ @Override
+ protected boolean nextInner() throws IOException {
+ LOG.debug("nextInner()");
+ if(current==list.size()) {
+ LOG.trace("end");
+ return false;
+ }
+ persistent = list.get(current);
+ key = (K) list.get(current).get(primaryFieldPos);
+ current++;
+ LOG.trace("current: "+persistent);
+ return true;
+ }
+
+ public int size() {
+ return list.size();
+ }
+
+ @Override
+ protected void clear() {
+ LOG.debug("clear()");
+ // do nothing
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/gora/blob/8267561e/gora-infinispan/src/main/java/org/apache/gora/infinispan/store/InfinispanClient.java
----------------------------------------------------------------------
diff --git a/gora-infinispan/src/main/java/org/apache/gora/infinispan/store/InfinispanClient.java b/gora-infinispan/src/main/java/org/apache/gora/infinispan/store/InfinispanClient.java
new file mode 100644
index 0000000..ccd45e3
--- /dev/null
+++ b/gora-infinispan/src/main/java/org/apache/gora/infinispan/store/InfinispanClient.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.infinispan.store;
+
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.infinispan.avro.client.Marshaller;
+import org.infinispan.avro.client.Support;
+import org.infinispan.avro.hotrod.QueryBuilder;
+import org.infinispan.avro.hotrod.QueryFactory;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
+import org.infinispan.commons.api.BasicCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/*
+ * @author Pierre Sutra, Valerio Schiavoni
+ */
+public class InfinispanClient<K, T extends PersistentBase> implements Configurable{
+
+ public static final Logger LOG = LoggerFactory.getLogger(InfinispanClient.class);
+
+ public static final String ISPN_CONNECTION_STRING_KEY = "infinispan.connectionstring";
+ public static final String ISPN_CONNECTION_STRING_DEFAULT = "127.0.0.1:11222";
+
+ private Configuration conf;
+
+ private Class<K> keyClass;
+ private Class<T> persistentClass;
+ private RemoteCacheManager cacheManager;
+ private QueryFactory qf;
+
+ private RemoteCache<K, T> cache;
+ private boolean cacheExists;
+
+ private Map<K,T> toPut;
+
+ public InfinispanClient() {
+ conf = new Configuration();
+ }
+
+ public synchronized void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws Exception {
+
+ if (cache!=null)
+ return; // already initialized.
+
+ this.keyClass = keyClass;
+ this.persistentClass = persistentClass;
+
+ String host = properties.getProperty(ISPN_CONNECTION_STRING_KEY,
+ getConf().get(ISPN_CONNECTION_STRING_KEY, ISPN_CONNECTION_STRING_DEFAULT));
+ conf.set(ISPN_CONNECTION_STRING_KEY, host);
+ properties.setProperty(ISPN_CONNECTION_STRING_KEY, host);
+ LOG.info("Connecting client to "+host);
+
+ Marshaller<T> marshaller = new Marshaller<T>(persistentClass);
+ ConfigurationBuilder builder = new ConfigurationBuilder();
+ builder.addServers(host);
+ builder.marshaller(marshaller);
+ cacheManager = new RemoteCacheManager(builder.build());
+ cacheManager.start();
+
+ cache = cacheManager.getCache(persistentClass.getSimpleName());
+ qf = org.infinispan.avro.hotrod.Search.getQueryFactory(cache);
+ createSchema();
+
+ toPut = new HashMap<>();
+ }
+
+ public boolean cacheExists(){
+ return cacheExists;
+ }
+
+ public void createSchema() {
+ try {
+ Support.registerSchema(cacheManager, persistentClass.newInstance().getSchema());
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void createCache() {
+ createSchema();
+ cacheExists = true;
+ }
+
+ public void dropCache() {
+ cache.clear();
+ cacheExists = false;
+ }
+
+ public void deleteByKey(K key) {
+ cache.remove(key);
+ }
+
+ public synchronized void put(K key, T val) {
+ toPut.put(key, val);
+ }
+
+ public void putIfAbsent(K key, T obj) {
+ this.cache.putIfAbsent(key,obj);
+ }
+
+ public T get(K key){
+ return cache.get(key);
+ }
+
+ public boolean containsKey(K key) {
+ return cache.containsKey(key);
+ }
+
+ public String getCacheName() {
+ return this.persistentClass.getSimpleName();
+ }
+
+ public BasicCache<K, T> getCache() {
+ return this.cache;
+ }
+
+ public QueryBuilder getQueryBuilder() {
+ return (QueryBuilder) qf.from(persistentClass);
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf =conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+
+ public void flush(){
+ LOG.debug("flush()");
+ if (!toPut.isEmpty()) cache.putAll(toPut);
+ toPut.clear();
+ }
+
+ public synchronized void close() {
+ LOG.debug("close()");
+ flush();
+ getCache().stop();
+ cacheManager.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/gora/blob/8267561e/gora-infinispan/src/main/java/org/apache/gora/infinispan/store/InfinispanStore.java
----------------------------------------------------------------------
diff --git a/gora-infinispan/src/main/java/org/apache/gora/infinispan/store/InfinispanStore.java b/gora-infinispan/src/main/java/org/apache/gora/infinispan/store/InfinispanStore.java
new file mode 100644
index 0000000..488e477
--- /dev/null
+++ b/gora-infinispan/src/main/java/org/apache/gora/infinispan/store/InfinispanStore.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.infinispan.store;
+
+import org.apache.gora.infinispan.query.InfinispanQuery;
+import org.apache.gora.infinispan.query.InfinispanResult;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.PartitionQuery;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.store.impl.DataStoreBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.apache.gora.mapreduce.GoraRecordReader.BUFFER_LIMIT_READ_NAME;
+import static org.apache.gora.mapreduce.GoraRecordReader.BUFFER_LIMIT_READ_VALUE;
+
+/**
+ * {@link org.apache.gora.infinispan.store.InfinispanStore} is the primary class
+ * responsible for directing Gora CRUD operations to Infinispan.This class delegate
+ * most operations, e.g., initialization, creation and deletion to (Infinispan caches),
+ * via {@link org.apache.gora.infinispan.store.InfinispanClient}.
+ *
+ * To specify the Infinispan deployment, include parameter <i>infinispan.connectionstring</i>
+ * in <i>gora.properties</i> with the list of servers, e.g., "127.0.0.1:11222,127.0.0.1:11223".
+ *
+ * @author Pierre Sutra, Valerio Schiavoni
+ *
+ */
+public class InfinispanStore<K, T extends PersistentBase> extends DataStoreBase<K, T> {
+
+ public static final Logger LOG = LoggerFactory.getLogger(InfinispanStore.class);
+
+ private InfinispanClient<K, T> infinispanClient;
+ private String primaryFieldName;
+ private int primaryFieldPos;
+ private int splitSize;
+
+ public InfinispanStore() throws Exception {}
+
+ @Override
+ public synchronized void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) {
+
+ try {
+
+ if (primaryFieldName!=null) {
+ LOG.info("Client already initialized; ignoring.");
+ return;
+ }
+
+ super.initialize(keyClass, persistentClass, properties);
+ infinispanClient = new InfinispanClient<>();
+ infinispanClient.setConf(conf);
+
+ LOG.info("key class: "
+ + keyClass.getCanonicalName()
+ + ", persistent class: "
+ + persistentClass.getCanonicalName());
+ schema = persistentClass.newInstance().getSchema();
+
+ splitSize = Integer.valueOf(
+ properties.getProperty( BUFFER_LIMIT_READ_NAME,
+ getConf().get(
+ BUFFER_LIMIT_READ_NAME,
+ Integer.toString(BUFFER_LIMIT_READ_VALUE))));
+ LOG.info("split size: "+splitSize);
+
+ primaryFieldPos = 0;
+ primaryFieldName = schema.getFields().get(0).name();
+ this.infinispanClient.initialize(keyClass, persistentClass, properties);
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ LOG.debug("close()");
+ infinispanClient.close();
+ }
+
+ @Override
+ public void createSchema() {
+ LOG.debug("createSchema()");
+ this.infinispanClient.createCache();
+ }
+
+ @Override
+ public boolean delete(K key) {
+ LOG.debug("delete(" + key+")");
+ this.infinispanClient.deleteByKey(key);
+ return true;
+ }
+
+ @Override
+ public long deleteByQuery(Query<K, T> query) {
+ ((InfinispanQuery<K, T>) query).build();
+ LOG.debug("deleteByQuery("+query.toString()+")");
+ InfinispanQuery<K, T> q = (InfinispanQuery) query;
+ q.build();
+ for( T t : q.list()){
+ infinispanClient.deleteByKey((K) t.get(primaryFieldPos));
+ }
+ return q.getResultSize();
+ }
+
+ @Override
+ public void deleteSchema() {
+ LOG.debug("deleteSchema()");
+ this.infinispanClient.dropCache();
+ }
+
+ @Override
+ public Result<K, T> execute(Query<K, T> query) {
+ LOG.debug("execute()");
+ ((InfinispanQuery<K,T>)query).build();
+ InfinispanResult<K,T> result = new InfinispanResult<>(this, (InfinispanQuery<K,T>)query);
+ LOG.trace("query: " + query.toString());
+ LOG.trace("result size: " + result.size());
+ return result;
+ }
+
+ @Override
+ public T get(K key){
+ LOG.debug("get("+key+")");
+ return infinispanClient.get(key);
+ }
+
+ @Override
+ public T get(K key, String[] fields) {
+ LOG.debug("get("+key+","+fields+")");
+ if (fields==null)
+ return infinispanClient.get(key);
+
+ InfinispanQuery query = new InfinispanQuery(this);
+ query.setKey(key);
+ query.setFields(fields);
+ query.build();
+
+
+ Result<K,T> result = query.execute();
+ try {
+ result.next();
+ return result.get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ *
+ * Split the query per infinispan node resulting in a list of queries.
+ * For each Infinispan server, this function returns a set of qeuries
+ * using pagination of the originial query. The size of each query
+ * in this pagination equals <i>gora.buffer.read.limit</i>.
+ *
+ * @param query the base query to create the partitions for. If the query
+ * is null, then the data store returns the partitions for the default query
+ * (returning every object)
+ * @return
+ * @throws IOException
+ */
+ @Override
+ public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
+ throws IOException {
+ LOG.debug("getPartitions()");
+
+ // 1 - split the query per location
+ List<PartitionQuery<K,T>> locations = ((InfinispanQuery<K,T>)query).split();
+
+ // 2 -split each location
+ List<PartitionQuery<K,T>> splitLocations = new ArrayList<>();
+ for(PartitionQuery<K,T> location : locations) {
+
+ LOG.trace("location: "+ ((InfinispanQuery)location).getLocation().toString());
+
+ // 2.1 - compute the result size
+ InfinispanQuery<K,T> sizeQuery = (InfinispanQuery<K, T>) ((InfinispanQuery<K, T>) location).clone();
+ sizeQuery.setFields(primaryFieldName);
+ sizeQuery.setLimit(1);
+ sizeQuery.rebuild();
+
+ // 2.2 - check if splitting is necessary
+ int resultSize = sizeQuery.getResultSize();
+ long queryLimit = query.getLimit();
+ long splitLimit = queryLimit>0 ? Math.min((long)resultSize,queryLimit) : resultSize;
+ LOG.trace("split limit: "+ splitLimit);
+ LOG.trace("split size: "+ splitSize);
+ if (splitLimit <= splitSize) {
+ LOG.trace("location returned");
+ splitLocations.add(location);
+ continue;
+ }
+
+ // 2.3 - compute the splits
+ for(int i=0; i<Math.ceil((double)splitLimit/(double)splitSize); i++) {
+ InfinispanQuery<K, T> split = (InfinispanQuery<K, T>) ((InfinispanQuery<K, T>) location).clone();
+ split.setOffset(i * splitSize);
+ split.setLimit(splitSize);
+ split.rebuild();
+ splitLocations.add(split);
+ }
+ }
+
+ return splitLocations;
+ }
+
+ @Override
+ public void flush() {
+ LOG.debug("flush()");
+ infinispanClient.flush();
+ }
+
+ /**
+ * In Infinispan, Schemas are referred to as caches.
+ *
+ * @return Cache
+ */
+ @Override
+ public String getSchemaName() {
+ LOG.debug("getSchemaName()");
+ return this.infinispanClient.getCacheName();
+ }
+
+ @Override
+ public Query<K, T> newQuery() {
+ LOG.debug("newQuery()");
+ Query<K, T> query = new InfinispanQuery<K, T>(this);
+ query.setFields(getFieldsToQuery(null));
+ return query;
+ }
+
+ @Override
+ public void put(K key, T obj) {
+ LOG.debug("put(" +key.toString()+")");
+ LOG.trace(obj.toString());
+
+ if (obj.get(primaryFieldPos)==null)
+ obj.put(primaryFieldPos,key);
+
+ if (!obj.get(primaryFieldPos).equals(key) )
+ LOG.warn("Invalid or different primary field :"+key+"<->"+obj.get(primaryFieldPos));
+
+ this.infinispanClient.put(key, obj);
+ }
+
+ @Override
+ public boolean schemaExists() {
+ LOG.debug("schemaExists()");
+ return infinispanClient.cacheExists();
+ }
+
+ public InfinispanClient<K, T> getClient() {
+ LOG.debug("getClient()");
+ return infinispanClient;
+ }
+
+ public String getPrimaryFieldName() {
+ LOG.debug("getPrimaryField()");
+ return primaryFieldName;
+ }
+
+ public void setPrimaryFieldName(String name){
+ LOG.debug("getPrimaryFieldName()");
+ primaryFieldName = name;
+ }
+
+ public int getPrimaryFieldPos(){
+ LOG.debug("getPrimaryFieldPos()");
+ return primaryFieldPos;
+ }
+
+ public void setPrimaryFieldPos(int p){
+ LOG.debug("setPrimaryFieldPos()");
+ primaryFieldPos = p;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/gora/blob/8267561e/gora-infinispan/src/test/java/org/apache/gora/infinispan/GoraInfinispanTestDriver.java
----------------------------------------------------------------------
diff --git a/gora-infinispan/src/test/java/org/apache/gora/infinispan/GoraInfinispanTestDriver.java b/gora-infinispan/src/test/java/org/apache/gora/infinispan/GoraInfinispanTestDriver.java
new file mode 100644
index 0000000..17e3e9c
--- /dev/null
+++ b/gora-infinispan/src/test/java/org/apache/gora/infinispan/GoraInfinispanTestDriver.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @author valerio schiavoni
+ *
+ */
+
+package org.apache.gora.infinispan;
+
+import org.apache.gora.GoraTestDriver;
+import org.apache.gora.examples.generated.Employee;
+import org.apache.gora.examples.generated.WebPage;
+import org.apache.gora.infinispan.store.InfinispanStore;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.util.GoraException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper class for third party tests using gora-infinispan backend.
+ *
+ * @see GoraTestDriver for test specifics. This driver is the base for all test
+ * cases that require an embedded Infinispan server. It starts (setUp) and
+ * stops (tearDown) embedded Infinispan server.
+ *
+ * * @author Pierre Sutra, Valerio Schiavoni
+ *
+ */
+
+public class GoraInfinispanTestDriver extends GoraTestDriver {
+
+ private static Logger log = LoggerFactory.getLogger(GoraInfinispanTestDriver.class);
+
+ private SimulationDriver delegate;
+ private int numbderOfNodes;
+ public List<String> cacheNames;
+
+ public GoraInfinispanTestDriver(int numbderOfNodes) {
+ this(numbderOfNodes, null);
+ }
+
+ public GoraInfinispanTestDriver(int numbderOfNodes, List<String> cacheNames){
+ super(InfinispanStore.class);
+ this.cacheNames = new ArrayList<>();
+ this.numbderOfNodes = numbderOfNodes;
+ if (cacheNames!=null) {
+ this.cacheNames.addAll(cacheNames);
+ }
+ }
+
+ public String connectionString(){
+ return delegate.connectionString();
+ }
+
+ @Override
+ public void setUpClass() throws Exception {
+ super.setUpClass();
+ log.info("Starting Infinispan...");
+ delegate = new SimulationDriver(numbderOfNodes,cacheNames);
+ try{
+ delegate.create();
+ }catch (Throwable e){
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void tearDownClass() throws Exception {
+ super.tearDownClass();
+ log.info("Stopping Infinispan...");
+ delegate.destroy();
+ }
+
+ @Override
+ public<K, T extends Persistent> DataStore<K,T>
+ createDataStore(Class<K> keyClass, Class<T> persistentClass) throws GoraException {
+ InfinispanStore store = (InfinispanStore) super.createDataStore(keyClass, persistentClass);
+ if (persistentClass.equals(Employee.class)) {
+ store.setPrimaryFieldName("ssn");
+ store.setPrimaryFieldPos(2);
+ }else if(persistentClass.equals(WebPage.class)) {
+ store.setPrimaryFieldName("url");
+ store.setPrimaryFieldPos(0);
+ }
+ return store;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/gora/blob/8267561e/gora-infinispan/src/test/java/org/apache/gora/infinispan/SimulationDriver.java
----------------------------------------------------------------------
diff --git a/gora-infinispan/src/test/java/org/apache/gora/infinispan/SimulationDriver.java b/gora-infinispan/src/test/java/org/apache/gora/infinispan/SimulationDriver.java
new file mode 100644
index 0000000..a292f2c
--- /dev/null
+++ b/gora-infinispan/src/test/java/org/apache/gora/infinispan/SimulationDriver.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.infinispan;
+
+import org.infinispan.client.hotrod.test.HotRodClientTestingUtil;
+import org.infinispan.configuration.cache.CacheMode;
+import org.infinispan.configuration.cache.Configuration;
+import org.infinispan.configuration.cache.ConfigurationBuilder;
+import org.infinispan.configuration.cache.Index;
+import org.infinispan.configuration.global.GlobalConfigurationBuilder;
+import org.infinispan.lifecycle.ComponentStatus;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.remoting.transport.Transport;
+import org.infinispan.server.hotrod.HotRodServer;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.fwk.TestResourceTracker;
+import org.infinispan.test.fwk.TransportFlags;
+import org.infinispan.transaction.TransactionMode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Collections.EMPTY_LIST;
+import static org.infinispan.server.hotrod.test.HotRodTestingUtil.hotRodCacheConfiguration;
+import static org.infinispan.test.TestingUtil.blockUntilCacheStatusAchieved;
+
+/**
+ * @author Pierre Sutra
+ */
+public class SimulationDriver extends MultipleCacheManagersTest {
+
+ private int numberOfNodes = 0 ;
+ private List<String> cacheNames = EMPTY_LIST;
+ private List<HotRodServer> servers = new ArrayList<>();
+ private String connectionString ="";
+
+ public SimulationDriver(int numberOfNodes, List<String> cacheNames){
+ this.numberOfNodes = numberOfNodes;
+ this.cacheNames = cacheNames;
+ TestResourceTracker.setThreadTestName("test");
+ }
+
+ public int getNumberOfNodes(){
+ return numberOfNodes;
+ }
+
+ public List<String> getCacheNames(){
+ return this.cacheNames;
+ }
+
+ public void create() throws Throwable {
+ createCacheManagers();
+ }
+
+ @Override
+ public void createCacheManagers() throws Throwable {
+ ConfigurationBuilder builder = hotRodCacheConfiguration(getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false));
+ create(numberOfNodes, builder);
+ }
+
+ @Override
+ public void destroy(){
+ // Correct order is to stop servers first
+ try {
+ for (HotRodServer server : servers)
+ HotRodClientTestingUtil.killServers(server);
+ } finally {
+ // And then the caches and cache managers
+ super.destroy();
+ }
+ }
+
+ public String connectionString(){
+ return connectionString;
+ }
+
+ // Helpers
+
+ protected void create(int nnodes, ConfigurationBuilder defaultBuilder) {
+
+ // Start Hot Rod servers at each site.
+ for (int j = 0; j < nnodes; j++) {
+ GlobalConfigurationBuilder gbuilder = GlobalConfigurationBuilder.defaultClusteredBuilder();
+ Transport transport = gbuilder.transport().getTransport();
+ gbuilder.transport().transport(transport);
+ gbuilder.transport().clusterName("test");
+ startHotRodServer(gbuilder, defaultBuilder, j + 1);
+ }
+
+ // Create appropriate caches at each node.
+ ConfigurationBuilder builder = hotRodCacheConfiguration(getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false));
+ builder.indexing()
+ .enable()
+ .index(Index.LOCAL)
+ .addProperty("default.directory_provider", "ram")
+ .addProperty("hibernate.search.default.exclusive_index_use","true")
+ .addProperty("hibernate.search.default.indexmanager","near-real-time")
+ .addProperty("hibernate.search.default.indexwriter.ram_buffer_size","128")
+ .addProperty("lucene_version", "LUCENE_CURRENT");
+ builder.clustering().hash().numOwners(1);
+ builder.jmxStatistics().enable();
+ builder.transaction().transactionMode(TransactionMode.TRANSACTIONAL);
+ Configuration configuration = builder.build();
+ for (int j = 0; j < nnodes; j++) {
+ for (String name : cacheNames) {
+ manager(j).defineConfiguration(name,configuration);
+ manager(j).getCache(name, true);
+ }
+ }
+ // Verify that default caches are started.
+ for (int j = 0; j < nnodes; j++) {
+ assert manager(j).getCache() != null;
+ }
+
+ // Verify that the default caches is running.
+ for (int j = 0; j < nnodes; j++) {
+ blockUntilCacheStatusAchieved(
+ manager(j).getCache(), ComponentStatus.RUNNING, 10000);
+ }
+
+ for (int j = 0; j < nnodes; j++) {
+ if (j!=0) connectionString+=";";
+ connectionString += server(j).getHost() + ":" + server(j).getPort();
+ }
+ }
+
+ protected HotRodServer server(int i) {
+ return servers.get(i);
+ }
+
+ protected List<HotRodServer> servers(){
+ return servers;
+ }
+
+ protected void startHotRodServer(GlobalConfigurationBuilder gbuilder, ConfigurationBuilder builder, int nodeIndex) {
+ TransportFlags transportFlags = new TransportFlags();
+ EmbeddedCacheManager cm = addClusterEnabledCacheManager(gbuilder, builder, transportFlags);
+ HotRodServer server = HotRodClientTestingUtil.startHotRodServer(cm);
+ servers.add(server);
+ }
+}
http://git-wip-us.apache.org/repos/asf/gora/blob/8267561e/gora-infinispan/src/test/java/org/apache/gora/infinispan/Utils.java
----------------------------------------------------------------------
diff --git a/gora-infinispan/src/test/java/org/apache/gora/infinispan/Utils.java b/gora-infinispan/src/test/java/org/apache/gora/infinispan/Utils.java
new file mode 100644
index 0000000..3bf4a09
--- /dev/null
+++ b/gora-infinispan/src/test/java/org/apache/gora/infinispan/Utils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.infinispan;
+
+import org.apache.gora.examples.generated.Employee;
+import org.apache.gora.store.DataStore;
+
+import java.util.Random;
+
+/**
+ * @author Pierre Sutra
+ */
+public class Utils {
+
+ private static Random rand = new Random(System.currentTimeMillis());
+ public static final long YEAR_IN_MS = 365L * 24L * 60L * 60L * 1000L;
+
+ public static Employee createEmployee(int i) {
+ Employee employee = Employee.newBuilder().build();
+ employee.setSsn(Long.toString(i));
+ employee.setName(Long.toString(rand.nextLong()));
+ employee.setDateOfBirth(rand.nextLong() - 20L * YEAR_IN_MS);
+ employee.setSalary(rand.nextInt());
+ return employee;
+ }
+
+ public static <T extends CharSequence> void populateEmployeeStore(DataStore<T, Employee> dataStore, int n) {
+ for(int i=0; i<n; i++) {
+ Employee e = createEmployee(i);
+ dataStore.put((T)e.getSsn(),e);
+ }
+ dataStore.flush();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/gora/blob/8267561e/gora-infinispan/src/test/java/org/apache/gora/infinispan/mapreduce/InfinispanStoreMapReduceTest.java
----------------------------------------------------------------------
diff --git a/gora-infinispan/src/test/java/org/apache/gora/infinispan/mapreduce/InfinispanStoreMapReduceTest.java b/gora-infinispan/src/test/java/org/apache/gora/infinispan/mapreduce/InfinispanStoreMapReduceTest.java
new file mode 100644
index 0000000..19e3a53
--- /dev/null
+++ b/gora-infinispan/src/test/java/org/apache/gora/infinispan/mapreduce/InfinispanStoreMapReduceTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.infinispan.mapreduce;
+
+import org.apache.gora.examples.generated.WebPage;
+import org.apache.gora.infinispan.GoraInfinispanTestDriver;
+import org.apache.gora.infinispan.store.InfinispanStore;
+import org.apache.gora.mapreduce.DataStoreMapReduceTestBase;
+import org.apache.gora.store.DataStore;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.gora.infinispan.store.InfinispanClient.ISPN_CONNECTION_STRING_KEY;
+
+/**
+ * @author Pierre Sutra
+ */
+public class InfinispanStoreMapReduceTest extends DataStoreMapReduceTestBase {
+
+ private GoraInfinispanTestDriver driver;
+ private Configuration conf;
+
+ public InfinispanStoreMapReduceTest() throws IOException {
+ super();
+ List<String> cacheNames = new ArrayList<>();
+ cacheNames.add(WebPage.class.getSimpleName());
+ driver = new GoraInfinispanTestDriver(3, cacheNames);
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ driver.setUpClass();
+ super.setUp();
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+ driver.tearDownClass();
+ }
+
+ @Override
+ protected DataStore<String, WebPage> createWebPageDataStore() throws IOException {
+ conf = driver.getConfiguration();
+ conf.set(ISPN_CONNECTION_STRING_KEY,driver.connectionString());
+ try {
+ InfinispanStore<String,WebPage> store = new InfinispanStore<>();
+ store.setConf(conf);
+ store.initialize(String.class, WebPage.class, new Properties());
+ return store;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/gora/blob/8267561e/gora-infinispan/src/test/java/org/apache/gora/infinispan/store/InfinispanStoreTest.java
----------------------------------------------------------------------
diff --git a/gora-infinispan/src/test/java/org/apache/gora/infinispan/store/InfinispanStoreTest.java b/gora-infinispan/src/test/java/org/apache/gora/infinispan/store/InfinispanStoreTest.java
new file mode 100644
index 0000000..3b701c6
--- /dev/null
+++ b/gora-infinispan/src/test/java/org/apache/gora/infinispan/store/InfinispanStoreTest.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Testing class for all standard gora-cassandra functionality.
+ * We extend DataStoreTestBase enabling us to run the entire base test
+ * suite for Gora.
+ */
+package org.apache.gora.infinispan.store;
+
+import org.apache.gora.examples.generated.Employee;
+import org.apache.gora.examples.generated.WebPage;
+import org.apache.gora.filter.FilterOp;
+import org.apache.gora.filter.SingleFieldValueFilter;
+import org.apache.gora.infinispan.GoraInfinispanTestDriver;
+import org.apache.gora.infinispan.Utils;
+import org.apache.gora.infinispan.query.InfinispanQuery;
+import org.apache.gora.query.PartitionQuery;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.store.DataStoreTestBase;
+import org.apache.gora.util.TestIOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.gora.infinispan.store.InfinispanClient.ISPN_CONNECTION_STRING_KEY;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for {@link InfinispanStore}.
+ * @author Pierre Sutra
+ */
+
+public class InfinispanStoreTest extends DataStoreTestBase {
+
+ private Configuration conf;
+ private InfinispanStore<String,Employee> employeeDataStore;
+ private InfinispanStore<String,WebPage> webPageDataStore;
+
+ @BeforeClass
+ public static void setUpClass() throws Exception {
+ List<String> cacheNames = new ArrayList<>();
+ cacheNames.add(Employee.class.getSimpleName());
+ cacheNames.add(WebPage.class.getSimpleName());
+ setTestDriver(new GoraInfinispanTestDriver(1, cacheNames));
+ DataStoreTestBase.setUpClass();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ GoraInfinispanTestDriver driver = getTestDriver();
+ conf = driver.getConfiguration();
+ conf.set(ISPN_CONNECTION_STRING_KEY,getTestDriver().connectionString());
+ super.setUp();
+ employeeDataStore = (InfinispanStore<String, Employee>) employeeStore;
+ webPageDataStore = (InfinispanStore<String, WebPage>) webPageStore;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected DataStore<String, Employee> createEmployeeDataStore()
+ throws IOException {
+ throw new IllegalStateException("Using driver.");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected DataStore<String, WebPage> createWebPageDataStore()
+ throws IOException {
+ throw new IllegalStateException("Using driver.");
+ }
+
+ @Test
+ public void testQueryMarshability() throws Exception {
+ Utils.populateEmployeeStore(employeeStore, 100);
+ InfinispanQuery<String,Employee> query = new InfinispanQuery<>(employeeDataStore);
+ query.setFields("field");
+ query.setLimit(1);
+ query.setOffset(1);
+ query.build();
+ TestIOUtils.testSerializeDeserialize(query);
+ }
+
+ @Test
+ public void testReadWriteQuery() throws Exception {
+ final int NEMPLOYEE = 100;
+ Utils.populateEmployeeStore(employeeStore, NEMPLOYEE);
+ InfinispanQuery<String,Employee> query;
+
+ // Partitioning
+ int retrieved = 0;
+ query = new InfinispanQuery<>(employeeDataStore);
+ query.build();
+ for (PartitionQuery<String,Employee> q : employeeDataStore.getPartitions(query)) {
+ retrieved+=((InfinispanQuery<String,Employee>) q).list().size();
+ }
+ assert retrieved==NEMPLOYEE;
+
+ // Test matching everything
+ query = new InfinispanQuery<>(employeeDataStore);
+ SingleFieldValueFilter filter = new SingleFieldValueFilter();
+ filter.setFieldName("name");
+ filter.setFilterOp(FilterOp.EQUALS);
+ List<Object> operaands = new ArrayList<>();
+ operaands.add("*");
+ filter.setOperands(operaands);
+ query.setFilter(filter);
+ query.build();
+ List<Employee> result = new ArrayList<>();
+ for (PartitionQuery<String,Employee> q : employeeDataStore.getPartitions(query)) {
+ result.addAll(((InfinispanQuery<String,Employee>)q).list());
+ }
+ assertEquals(NEMPLOYEE,result.size());
+
+ // Test matching nothing
+ query = new InfinispanQuery<>(employeeDataStore);
+ filter = new SingleFieldValueFilter();
+ filter.setFieldName("name");
+ filter.setFilterOp(FilterOp.NOT_EQUALS);
+ operaands.clear();
+ operaands.add("*");
+ filter.setOperands(operaands);
+ query.setFilter(filter);
+ query.build();
+ assertEquals(0,query.list().size());
+
+ }
+
+
+ public GoraInfinispanTestDriver getTestDriver() {
+ return (GoraInfinispanTestDriver) testDriver;
+ }
+
+ @Override
+ public void testDeleteByQueryFields() throws IOException, Exception {
+ // FIXME not working
+ }
+
+ @Override
+ public void testDeleteByQuery() throws IOException, Exception {
+ // FIXME not working
+ }
+
+ @Override
+ public void testQueryEndKey() throws IOException, Exception {
+ // FIXME not working
+ }
+
+ @Override
+ public void testGetWithFields() throws IOException, Exception {
+ // FIXME not working
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/gora/blob/8267561e/gora-infinispan/src/test/resources/gora.properties
----------------------------------------------------------------------
diff --git a/gora-infinispan/src/test/resources/gora.properties b/gora-infinispan/src/test/resources/gora.properties
new file mode 100644
index 0000000..ff026be
--- /dev/null
+++ b/gora-infinispan/src/test/resources/gora.properties
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Datastore is configured programatically in the tests
+infinispan.partition.size=1
+
http://git-wip-us.apache.org/repos/asf/gora/blob/8267561e/gora-infinispan/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/gora-infinispan/src/test/resources/log4j.properties b/gora-infinispan/src/test/resources/log4j.properties
new file mode 100644
index 0000000..a509530
--- /dev/null
+++ b/gora-infinispan/src/test/resources/log4j.properties
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# for production, you should probably set pattern to %c instead of %l.
+# (%l is slower.)
+
+# output messages into a rolling log file as well as stdout
+log4j.rootLogger=INFO,stdout
+
+# stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n
+
+# rolling log file
+log4j.appender.R=org.apache.log4j.RollingFileAppender
+log4j.appender.R.maxFileSize=20MB
+log4j.appender.R.maxBackupIndex=50
+log4j.appender.R.layout=org.apache.log4j.PatternLayout
+log4j.appender.R.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n
+# Edit the next line to point to your logs directory
+
+# Application logging options
+log4j.logger.org.apache=ERROR
+
+# Adding this to avoid thrift logging disconnect errors.
+log4j.logger.org.apache.thrift.server.TNonblockingServer=ERROR
+
+# Add infinispan-specific
+log4j.logger.org.infinispan=WARN
+
+# Add for gora-infinispan specific logging duing tests
+log4j.logger.org.apache.gora.infinispan=WARN
+# log4j.logger.org.infinispan.query=TRACE
+#log4j.logger.org.apache.gora.infinispan.store.InfinispanClient=DEBUG
+# log4j.logger.org.infinispan.client.hotrod.impl.RemoteCacheImpl=TRACE
+# log4j.logger.org.infinispan.query.remote.avro.AvroQueryFacade=DEBUG
+
http://git-wip-us.apache.org/repos/asf/gora/blob/8267561e/gora-infinispan/src/test/resources/simplelogger.properties
----------------------------------------------------------------------
diff --git a/gora-infinispan/src/test/resources/simplelogger.properties b/gora-infinispan/src/test/resources/simplelogger.properties
new file mode 100644
index 0000000..bb9ef0c
--- /dev/null
+++ b/gora-infinispan/src/test/resources/simplelogger.properties
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.slf4j.simpleLogger.defaultLog=debug
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/gora/blob/8267561e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 946a569..ef39ec3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -701,6 +701,7 @@
<module>gora-cassandra</module>
<module>gora-goraci</module>
<module>gora-hbase</module>
+ <module>gora-infinispan</module>
<!-- module>gora-lucene</module -->
<module>gora-dynamodb</module>
<!--module>gora-sql</module -->