You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/03 17:36:32 UTC

[1/2] incubator-ignite git commit: IGNITE-389 - Merge branch ignite-sprint-5 into ignite-389

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-948 5b8d2fd27 -> 29c6d28a5 (forced update)


IGNITE-389 - Merge branch ignite-sprint-5 into ignite-389


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2aa1ace0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2aa1ace0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2aa1ace0

Branch: refs/heads/ignite-948
Commit: 2aa1ace0cdbf0fbbbcd5893958bddb7869742ce0
Parents: d0157d4
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Tue Jun 2 19:34:49 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Tue Jun 2 19:34:49 2015 -0700

----------------------------------------------------------------------
 .../processors/cache/IgniteCacheProxy.java      |   8 +-
 .../processors/cache/QueryCursorImpl.java       |  23 ++--
 .../processors/cache/query/QueryCursorEx.java   |   8 ++
 .../processors/query/GridQueryIndexing.java     |   2 +-
 .../processors/query/GridQueryProcessor.java    |  13 ++-
 ...niteDynamicCacheWithConfigStartSelfTest.java | 108 +++++++++++++++++++
 .../processors/query/h2/IgniteH2Indexing.java   |  43 +++++---
 .../h2/twostep/GridReduceQueryExecutor.java     |   8 +-
 .../cache/GridCacheCrossCacheQuerySelfTest.java |  12 ++-
 .../cache/IgniteCacheAbstractQuerySelfTest.java |  23 ++++
 modules/spark/pom.xml                           |  18 ++--
 .../org/apache/ignite/spark/IgniteContext.scala |   3 +
 .../org/apache/ignite/spark/IgniteRDD.scala     |  68 ++++++++++--
 .../spark/examples/IgniteProcessExample.scala   |   2 +-
 .../org/apache/ignite/spark/IgniteRddSpec.scala |  38 +++----
 15 files changed, 291 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 176543b..b3914e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -497,10 +497,14 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
                 return (QueryCursor<R>)queryContinuous((ContinuousQuery<K, V>)qry, qry.isLocal());
 
             if (qry instanceof SqlQuery) {
-                SqlQuery p = (SqlQuery)qry;
+                final SqlQuery p = (SqlQuery)qry;
 
                 if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal())
-                    return (QueryCursor<R>)new QueryCursorImpl<>(ctx.kernalContext().query().<K, V>queryLocal(ctx, p));
+                    return (QueryCursor<R>)new QueryCursorImpl<>(new Iterable<Cache.Entry<K, V>>() {
+                        @Override public Iterator<Cache.Entry<K, V>> iterator() {
+                            return ctx.kernalContext().query().<K, V>queryLocal(ctx, p);
+                        }
+                    });
 
                 return (QueryCursor<R>)ctx.kernalContext().query().queryTwoStep(ctx, p);
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
index 7cb9efc..d68c377 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
@@ -27,6 +27,9 @@ import java.util.*;
  * Query cursor implementation.
  */
 public class QueryCursorImpl<T> implements QueryCursorEx<T> {
+    /** Query executor. */
+    private Iterable<T> iterExec;
+
     /** */
     private Iterator<T> iter;
 
@@ -34,18 +37,18 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> {
     private boolean iterTaken;
 
     /** */
-    private Collection<GridQueryFieldMetadata> fieldsMeta;
+    private List<GridQueryFieldMetadata> fieldsMeta;
 
     /**
-     * @param iter Iterator.
+     * @param iterExec Query executor.
      */
-    public QueryCursorImpl(Iterator<T> iter) {
-        this.iter = iter;
+    public QueryCursorImpl(Iterable<T> iterExec) {
+        this.iterExec = iterExec;
     }
 
     /** {@inheritDoc} */
     @Override public Iterator<T> iterator() {
-        if (iter == null)
+        if (iter == null && iterTaken)
             throw new IgniteException("Cursor is closed.");
 
         if (iterTaken)
@@ -53,12 +56,16 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> {
 
         iterTaken = true;
 
+        iter = iterExec.iterator();
+
+        assert iter != null;
+
         return iter;
     }
 
     /** {@inheritDoc} */
     @Override public List<T> getAll() {
-        ArrayList<T> all = new ArrayList<>();
+        List<T> all = new ArrayList<>();
 
         try {
             for (T t : this) // Implicitly calls iterator() to do all checks.
@@ -103,14 +110,14 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> {
     /**
      * @param fieldsMeta SQL Fields query result metadata.
      */
-    public void fieldsMeta(Collection<GridQueryFieldMetadata> fieldsMeta) {
+    public void fieldsMeta(List<GridQueryFieldMetadata> fieldsMeta) {
         this.fieldsMeta = fieldsMeta;
     }
 
     /**
      * @return SQL Fields query result metadata.
      */
-    public Collection<GridQueryFieldMetadata> fieldsMeta() {
+    @Override public List<GridQueryFieldMetadata> fieldsMeta() {
         return fieldsMeta;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java
index bf1d4ea..5e19b99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java
@@ -19,6 +19,9 @@ package org.apache.ignite.internal.processors.cache.query;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.query.*;
+import org.apache.ignite.internal.processors.query.*;
+
+import java.util.*;
 
 /**
  * Extended query cursor interface allowing for "getAll" to output data into destination other than Collection.
@@ -32,6 +35,11 @@ public interface QueryCursorEx<T> extends QueryCursor<T> {
     public void getAll(Consumer<T> c) throws IgniteCheckedException;
 
     /**
+     * @return Query metadata.
+     */
+    public List<GridQueryFieldMetadata> fieldsMeta();
+
+    /**
      * Query value consumer.
      */
     public static interface Consumer<T> {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 0bb820d..cc0916a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -60,7 +60,7 @@ public interface GridQueryIndexing {
      * @param qry Query.
      * @return Cursor.
      */
-    public QueryCursor<List<?>> queryTwoStep(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry);
+    public Iterable<List<?>> queryTwoStep(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry);
 
     /**
      * Parses SQL query into two step query and executes it.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index cd4d543..31337ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -532,7 +532,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @param qry Query.
      * @return Cursor.
      */
-    public QueryCursor<List<?>> queryTwoStep(String space, GridCacheTwoStepQuery qry) {
+    public Iterable<List<?>> queryTwoStep(String space, GridCacheTwoStepQuery qry) {
         checkxEnabled();
 
         if (!busyLock.enterBusy())
@@ -670,7 +670,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @param qry Query.
      * @return Iterator.
      */
-    public QueryCursor<List<?>> queryLocalFields(GridCacheContext<?,?> cctx, SqlFieldsQuery qry) {
+    public QueryCursor<List<?>> queryLocalFields(final GridCacheContext<?,?> cctx, SqlFieldsQuery qry) {
         if (!busyLock.enterBusy())
             throw new IllegalStateException("Failed to execute query (grid is stopping).");
 
@@ -679,7 +679,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             String sql = qry.getSql();
             Object[] args = qry.getArgs();
 
-            GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter());
+            final GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter());
 
             if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
                 ctx.event().record(new CacheQueryExecutedEvent<>(
@@ -697,8 +697,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                         null));
             }
 
-            QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(
-                new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepPortable()));
+            QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() {
+                @Override public Iterator<List<?>> iterator() {
+                    return new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepPortable());
+                }
+            });
 
             cursor.fieldsMeta(res.metaData());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
new file mode 100644
index 0000000..704cf26
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+public class IgniteDynamicCacheWithConfigStartSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final String CACHE_NAME = "partitioned";
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        if (!client)
+            cfg.setCacheConfiguration(cacheConfiguration(gridName));
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return Cache configuration.
+     */
+    protected CacheConfiguration cacheConfiguration(String cacheName) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(CACHE_NAME);
+
+        ccfg.setIndexedTypes(String.class, String.class);
+
+        return ccfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartCacheOnClient() throws Exception {
+        int srvCnt = 3;
+
+        startGrids(srvCnt);
+
+        try {
+            client = true;
+
+            int clientCnt = 12;
+
+            IgniteEx[] clients = new IgniteEx[clientCnt];
+
+            for (int i = 0; i < clients.length; i++)
+                clients[i] = startGrid(i + srvCnt);
+
+            final AtomicInteger idx = new AtomicInteger();
+
+            GridTestUtils.runMultiThreaded(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    final int idx0 = idx.getAndIncrement();
+
+                    ignite(idx0).cache(CACHE_NAME).get(1);
+
+                    return null;
+                }
+            }, clients.length, "runner");
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 200da77..6ec329f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -592,7 +592,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @throws SQLException If failed.
      */
     private static List<GridQueryFieldMetadata> meta(ResultSetMetaData rsMeta) throws SQLException {
-        ArrayList<GridQueryFieldMetadata> meta = new ArrayList<>(rsMeta.getColumnCount());
+        List<GridQueryFieldMetadata> meta = new ArrayList<>(rsMeta.getColumnCount());
 
         for (int i = 1; i <= rsMeta.getColumnCount(); i++) {
             String schemaName = rsMeta.getSchemaName(i);
@@ -771,8 +771,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
-    @Override public QueryCursor<List<?>> queryTwoStep(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry) {
-        return rdcQryExec.query(cctx, qry);
+    @Override public Iterable<List<?>> queryTwoStep(final GridCacheContext<?,?> cctx, final GridCacheTwoStepQuery qry) {
+        return new Iterable<List<?>>() {
+            @Override public Iterator<List<?>> iterator() {
+                return rdcQryExec.query(cctx, qry);
+            }
+        };
     }
 
     /** {@inheritDoc} */
@@ -802,25 +806,30 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         final QueryCursor<List<?>> res = queryTwoStep(cctx, fqry);
 
-        final Iterator<List<?>> iter0 = res.iterator();
+        final Iterable<Cache.Entry<K, V>> converted = new Iterable<Cache.Entry<K, V>>() {
+            @Override public Iterator<Cache.Entry<K, V>> iterator() {
+                final Iterator<List<?>> iter0 = res.iterator();
 
-        Iterator<Cache.Entry<K,V>> iter = new Iterator<Cache.Entry<K,V>>() {
-            @Override public boolean hasNext() {
-                return iter0.hasNext();
-            }
+                return new Iterator<Cache.Entry<K,V>>() {
+                    @Override public boolean hasNext() {
+                        return iter0.hasNext();
+                    }
 
-            @Override public Cache.Entry<K,V> next() {
-                List<?> l = iter0.next();
+                    @Override public Cache.Entry<K,V> next() {
+                        List<?> l = iter0.next();
 
-                return new CacheEntryImpl<>((K)l.get(0),(V)l.get(1));
-            }
+                        return new CacheEntryImpl<>((K)l.get(0),(V)l.get(1));
+                    }
 
-            @Override public void remove() {
-                throw new UnsupportedOperationException();
+                    @Override public void remove() {
+                        throw new UnsupportedOperationException();
+                    }
+                };
             }
         };
 
-        return new QueryCursorImpl<Cache.Entry<K,V>>(iter) {
+        // No metadata for SQL queries.
+        return new QueryCursorImpl<Cache.Entry<K,V>>(converted) {
             @Override public void close() {
                 res.close();
             }
@@ -844,7 +853,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         }
 
         GridCacheTwoStepQuery twoStepQry;
-        Collection<GridQueryFieldMetadata> meta;
+        List<GridQueryFieldMetadata> meta;
 
         try {
             twoStepQry = GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, qry.getArgs(), qry.isCollocated());
@@ -863,7 +872,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         twoStepQry.pageSize(qry.getPageSize());
 
-        QueryCursorImpl<List<?>> cursor = (QueryCursorImpl<List<?>>)queryTwoStep(cctx, twoStepQry);
+        QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(queryTwoStep(cctx, twoStepQry));
 
         cursor.fieldsMeta(meta);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 50c30a5..cfacfcf 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -269,7 +269,7 @@ public class GridReduceQueryExecutor {
      * @param qry Query.
      * @return Cursor.
      */
-    public QueryCursor<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry) {
+    public Iterator<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry) {
         long qryReqId = reqIdGen.incrementAndGet();
 
         QueryRun r = new QueryRun();
@@ -356,7 +356,7 @@ public class GridReduceQueryExecutor {
 //                dropTable(r.conn, tbl.getName()); TODO
             }
 
-            return new QueryCursorImpl<>(new GridQueryCacheObjectsIterator(new Iter(res), cctx, cctx.keepPortable()));
+            return new GridQueryCacheObjectsIterator(new Iter(res), cctx, cctx.keepPortable());
         }
         catch (IgniteCheckedException | InterruptedException | RuntimeException e) {
             U.closeQuiet(r.conn);
@@ -381,7 +381,7 @@ public class GridReduceQueryExecutor {
      * @return Cursor for plans.
      * @throws IgniteCheckedException if failed.
      */
-    private QueryCursor<List<?>> explainPlan(JdbcConnection c, String space, GridCacheTwoStepQuery qry)
+    private Iterator<List<?>> explainPlan(JdbcConnection c, String space, GridCacheTwoStepQuery qry)
         throws IgniteCheckedException {
         List<List<?>> lists = new ArrayList<>();
 
@@ -403,7 +403,7 @@ public class GridReduceQueryExecutor {
 
         lists.add(F.asList(getPlan(rs)));
 
-        return new QueryCursorImpl<>(lists.iterator());
+        return lists.iterator();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
index 4e9bf31..dd7c879 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
@@ -127,9 +127,17 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
 
         q.addMapQuery("_cnts_", "select count(*) x from \"partitioned\".FactPurchase where ? = ?", 2, 2);
 
-        Object cnt = qryProc.queryTwoStep(cache, q).getAll().iterator().next().get(0);
+        Iterator<List<?>> it = qryProc.queryTwoStep(cache, q).iterator();
 
-        assertEquals(10L, cnt);
+        try {
+            Object cnt = it.next().get(0);
+
+            assertEquals(10L, cnt);
+        }
+        finally {
+            if (it instanceof AutoCloseable)
+                ((AutoCloseable)it).close();
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
index fa62361..0d45711 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.*;
 import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.query.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -987,6 +988,28 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
     /**
      * @throws Exception If failed.
      */
+    public void testFieldsQueryMetadata() throws Exception {
+        IgniteCache<UUID, Person> cache = ignite.cache(null);
+
+        for (int i = 0; i < 100; i++)
+            cache.put(UUID.randomUUID(), new Person("name-" + i, (i + 1) * 100));
+
+        QueryCursor<List<?>> cur = cache.query(new SqlFieldsQuery("select name, salary from Person where name like ?")
+            .setArgs("name-"));
+
+        assertTrue(cur instanceof QueryCursorEx);
+
+        QueryCursorEx<List<?>> curEx = (QueryCursorEx<List<?>>)cur;
+
+        List<GridQueryFieldMetadata> meta = curEx.fieldsMeta();
+
+        assertNotNull(meta);
+        assertEquals(2, meta.size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     private void checkSqlQueryEvents() throws Exception {
         final CountDownLatch execLatch = new CountDownLatch(cacheMode() == REPLICATED ? 1 : gridCount());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/spark/pom.xml
----------------------------------------------------------------------
diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml
index 84055d6..a4a25f5 100644
--- a/modules/spark/pom.xml
+++ b/modules/spark/pom.xml
@@ -31,7 +31,7 @@
     </parent>
 
     <artifactId>ignite-spark</artifactId>
-    <version>1.0.7-SNAPSHOT</version>
+    <version>1.1.1-SNAPSHOT</version>
 
     <dependencies>
         <dependency>
@@ -58,16 +58,12 @@
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_2.10</artifactId>
             <version>1.3.1</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.twitter</groupId>
-                    <artifactId>chill_2.11</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.twitter</groupId>
-                    <artifactId>chill-java</artifactId>
-                </exclusion>
-            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.10</artifactId>
+            <version>1.3.1</version>
         </dependency>
 
         <!-- Test dependencies -->

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
index 6259665..5cdbad0 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.IgnitionEx
 import org.apache.ignite.{Ignition, Ignite}
 import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
 import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
 
 /**
  * Ignite context.
@@ -42,6 +43,8 @@ class IgniteContext[K, V](
         this(sc, () ⇒ IgnitionEx.loadConfiguration(springUrl).get1())
     }
 
+    val sqlContext = new SQLContext(sparkContext)
+
     def fromCache(cacheName: String): IgniteRDD[K, V] = {
         new IgniteRDD[K, V](this, cacheName, null)
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index f286b58..0b8e845 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -18,14 +18,18 @@ package org.apache.ignite.spark
 
 import javax.cache.Cache
 
-import org.apache.ignite.cache.query.{SqlFieldsQuery, SqlQuery, ScanQuery}
+import org.apache.ignite.cache.query._
 import org.apache.ignite.cluster.ClusterNode
 import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.internal.processors.cache.query.QueryCursorEx
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata
 import org.apache.ignite.lang.IgniteUuid
-import org.apache.ignite.spark.impl.{IgniteAbstractRDD, IgniteSqlRDD, IgnitePartition, IgniteQueryIterator}
+import org.apache.ignite.spark.impl._
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode
 import org.apache.spark.rdd.RDD
-import org.apache.spark.{TaskContext, Partition}
+import org.apache.spark.sql.types._
+import org.apache.spark.sql._
+import org.apache.spark._
 
 import scala.collection.JavaConversions._
 
@@ -98,12 +102,16 @@ class IgniteRDD[K, V] (
         new IgniteSqlRDD[(K, V), Cache.Entry[K, V], K, V](ic, cacheName, cacheCfg, qry, entry ⇒ (entry.getKey, entry.getValue))
     }
 
-    def sql(sql: String, args: Any*): RDD[Seq[Any]] = {
+    def sql(sql: String, args: Any*): DataFrame = {
         val qry = new SqlFieldsQuery(sql)
 
         qry.setArgs(args.map(_.asInstanceOf[Object]):_*)
 
-        new IgniteSqlRDD[Seq[Any], java.util.List[_], K, V](ic, cacheName, cacheCfg, qry, list ⇒ list)
+        val schema = buildSchema(ensureCache().query(qry).asInstanceOf[QueryCursorEx[java.util.List[_]]].fieldsMeta())
+
+        val rowRdd = new IgniteSqlRDD[Row, java.util.List[_], K, V](ic, cacheName, cacheCfg, qry, list ⇒ Row.fromSeq(list))
+
+        ic.sqlContext.createDataFrame(rowRdd, schema)
     }
 
     def saveValues(rdd: RDD[V]) = {
@@ -138,10 +146,6 @@ class IgniteRDD[K, V] (
             // Make sure to deploy the cache
             ensureCache()
 
-            val locNode = ig.cluster().localNode()
-
-            val node: Option[ClusterNode] = ig.cluster().forHost(locNode).nodes().find(!_.eq(locNode))
-
             val streamer = ig.dataStreamer[K, V](cacheName)
 
             try {
@@ -159,7 +163,49 @@ class IgniteRDD[K, V] (
         ensureCache().removeAll()
     }
 
-    private def affinityKeyFunc(value: V, node: ClusterNode): Object = {
-        IgniteUuid.randomUuid()
+    /**
+     * Builds spark schema from query metadata.
+     *
+     * @param fieldsMeta Fields metadata.
+     * @return Spark schema.
+     */
+    private def buildSchema(fieldsMeta: java.util.List[GridQueryFieldMetadata]): StructType = {
+        new StructType(fieldsMeta.map(i ⇒ new StructField(i.fieldName(), dataType(i.fieldTypeName()), nullable = true))
+            .toArray)
+    }
+
+    /**
+     * Gets Spark data type based on type name.
+     *
+     * @param typeName Type name.
+     * @return Spark data type.
+     */
+    private def dataType(typeName: String): DataType = typeName match {
+        case "java.lang.Boolean" ⇒ BooleanType
+        case "java.lang.Byte" ⇒ ByteType
+        case "java.lang.Short" ⇒ ShortType
+        case "java.lang.Integer" ⇒ IntegerType
+        case "java.lang.Long" ⇒ LongType
+        case "java.lang.Float" ⇒ FloatType
+        case "java.lang.Double" ⇒ DoubleType
+        case "java.lang.String" ⇒ StringType
+        case "java.util.Date" ⇒ DateType
+        case "java.sql.Timestamp" ⇒ TimestampType
+        case "[B" ⇒ BinaryType
+
+        case _ ⇒ StructType(new Array[StructField](0)) // TODO Do we need to fill user types?
+    }
+
+    /**
+     * Generates affinity key for given cluster node.
+     *
+     * @param value Value to generate key for.
+     * @param node Node to generate key for.
+     * @return Affinity key.
+     */
+    private def affinityKeyFunc(value: V, node: ClusterNode): IgniteUuid = {
+        val aff = ic.ignite().affinity[IgniteUuid](cacheName)
+
+        Stream.continually(IgniteUuid.randomUuid()).find(node == null || aff.mapKeyToNode(_).eq(node)).get
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala
index db8b5a3..ab91c62 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala
@@ -47,6 +47,6 @@ object IgniteProcessExample {
         ignite.fromCache("indexed").objectSql("Person", "age > ? and organizationId = ?", 20, 12).collect()
 
         // SQL fields query
-        val sqlRes: RDD[Seq[Any]] = ignite.fromCache("indexed").sql("select name, age from Person where age > ?", 20)
+        val df = ignite.fromCache("indexed").sql("select name, age from Person where age > ?", 20)
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala
index 68273da..26ce693 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala
@@ -117,39 +117,29 @@ class IgniteRddSpec extends FunSpec with Matchers with BeforeAndAfterAll with Be
 
                 val cache: IgniteRDD[String, Entity] = ic.fromCache(PARTITIONED_CACHE_NAME)
 
+                import ic.sqlContext.implicits._
+
                 cache.savePairs(sc.parallelize(0 to 1000, 2).map(i ⇒ (String.valueOf(i), new Entity(i, "name" + i, i * 100))))
 
-                val res = cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000).collect()
+                val df = cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000)
+
+                df.printSchema()
+
+                val res = df.collect()
 
                 assert(res.length == 1, "Invalid result length")
-                assert(50 == res(0).head, "Invalid result")
+                assert(50 == res(0)(0), "Invalid result")
                 assert("name50" == res(0)(1), "Invalid result")
                 assert(5000 == res(0)(2), "Invalid result")
 
-                assert(500 == cache.sql("select id from Entity where id > 500").count(), "Invalid count")
-            }
-            finally {
-                sc.stop()
-            }
-        }
-
-        it("should successfully store values RDD") {
-            val sc = new SparkContext("local[*]", "test")
-
-            try {
-                val ic = new IgniteContext[String, Entity](sc,
-                    () ⇒ configuration("client", client = true))
+                val df0 = cache.sql("select id, name, salary from Entity").where('NAME === "name50" and 'SALARY === 5000)
 
-                val cache: IgniteRDD[String, Entity] = ic.fromCache(PARTITIONED_CACHE_NAME)
+                val res0 = df0.collect()
 
-                cache.savePairs(sc.parallelize(0 to 1000, 2).map(i ⇒ (String.valueOf(i), new Entity(i, "name" + i, i * 100))))
-
-                val res = cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000).collect()
-
-                assert(res.length == 1, "Invalid result length")
-                assert(50 == res(0).head, "Invalid result")
-                assert("name50" == res(0)(1), "Invalid result")
-                assert(5000 == res(0)(2), "Invalid result")
+                assert(res0.length == 1, "Invalid result length")
+                assert(50 == res0(0)(0), "Invalid result")
+                assert("name50" == res0(0)(1), "Invalid result")
+                assert(5000 == res0(0)(2), "Invalid result")
 
                 assert(500 == cache.sql("select id from Entity where id > 500").count(), "Invalid count")
             }


[2/2] incubator-ignite git commit: ignite-948 Add Java API for Ignite RDD

Posted by sb...@apache.org.
ignite-948 Add Java API for Ignite RDD


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/29c6d28a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/29c6d28a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/29c6d28a

Branch: refs/heads/ignite-948
Commit: 29c6d28a530a09b4724c7f3463dc193090d62de6
Parents: 2aa1ace
Author: agura <ag...@gridgain.com>
Authored: Tue Jun 2 01:09:17 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Wed Jun 3 18:36:00 2015 +0300

----------------------------------------------------------------------
 modules/spark/pom.xml                           |  14 ++
 .../spark/examples/java/ColocationTest.java     |  88 +++++++
 .../examples/java/IgniteProcessExample.java     |  82 +++++++
 .../spark/examples/java/IgniteStoreExample.java |  72 ++++++
 .../spark/examples/java/package-info.java       |  21 ++
 .../org/apache/ignite/spark/IgniteRDD.scala     |  10 +-
 .../apache/ignite/spark/JavaIgniteContext.scala |  55 +++++
 .../org/apache/ignite/spark/JavaIgniteRDD.scala |  94 +++++++
 .../spark/impl/JavaIgniteAbstractRDD.scala      |  34 +++
 .../ignite/spark/JavaIgniteRDDSelfTest.java     | 244 +++++++++++++++++++
 parent/pom.xml                                  |   4 +
 11 files changed, 713 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c6d28a/modules/spark/pom.xml
----------------------------------------------------------------------
diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml
index a4a25f5..a36ee87 100644
--- a/modules/spark/pom.xml
+++ b/modules/spark/pom.xml
@@ -87,6 +87,20 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-beans</artifactId>
+            <version>${spring.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-context</artifactId>
+            <version>${spring.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c6d28a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java
new file mode 100644
index 0000000..dd687cf
--- /dev/null
+++ b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.examples.java;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spark.*;
+import org.apache.ignite.spark.examples.*;
+import org.apache.spark.*;
+import org.apache.spark.api.java.*;
+import org.apache.spark.api.java.function.*;
+
+import scala.Tuple2;
+
+import java.util.*;
+
+/**
+ * Colocation test example.
+ */
+public class ColocationTest {
+    /**
+     * @param args Args.
+     */
+    public static void main(String[] args) {
+        SparkConf conf = new SparkConf();
+
+        conf.setAppName("Colocation test");
+
+        JavaSparkContext sc = new JavaSparkContext("local[*]", "Colocation test", conf);
+
+        JavaIgniteContext<Integer, Integer> ignite = new JavaIgniteContext<>(sc, new IgniteOutClosure<IgniteConfiguration>() {
+            @Override public IgniteConfiguration apply() {
+                return ExampleConfiguration.configuration();
+            }
+        });
+
+        JavaIgniteRDD<Integer, Integer> cache = ignite.fromCache("partitioned");
+
+        List<Integer> seq = new ArrayList<>();
+
+        long sum = 0;
+
+        for (int i = 0; i < 100000; i++) {
+            seq.add(i);
+
+            sum += i;
+        }
+
+        IgniteClosure<Integer, Tuple2<Integer, Integer>> f = new IgniteClosure<Integer, Tuple2<Integer, Integer>>() {
+            @Override public Tuple2<Integer, Integer> apply(Integer i) {
+                return new Tuple2<>(i, i);
+            }
+        };
+
+        JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(F.transformList(seq, f), 48);
+
+        cache.savePairs(rdd);
+
+        // Execute parallel sum.
+        System.out.println("Local sum: " + sum);
+
+        System.out.println("Distributed sum: " + cache.map(new Function<Tuple2<Integer,Integer>, Integer>() {
+            @Override public Integer call(Tuple2<Integer, Integer> t) throws Exception {
+                return t._2();
+            }
+        }).fold(0, new Function2<Integer, Integer, Integer>() {
+            public Integer call(Integer x, Integer y) {
+                return x + y;
+            }
+        }));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c6d28a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java
new file mode 100644
index 0000000..e69ca5f
--- /dev/null
+++ b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.examples.java;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spark.*;
+import org.apache.ignite.spark.examples.*;
+import org.apache.spark.*;
+import org.apache.spark.api.java.*;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.sql.*;
+
+import scala.*;
+
+import java.lang.Boolean;
+import java.util.*;
+
+/**
+ * Ignite process example.
+ */
+public class IgniteProcessExample {
+    /**
+     * @param args Args.
+     */
+    public static void main(String[] args) {
+        SparkConf conf = new SparkConf();
+
+        conf.setAppName("Ignite processing example");
+
+        JavaSparkContext sc = new JavaSparkContext("local[*]", "Ignite processing example", conf);
+
+        JavaIgniteContext<Object, String> ignite = new JavaIgniteContext<>(sc, new IgniteOutClosure<IgniteConfiguration>() {
+            @Override public IgniteConfiguration apply() {
+                return ExampleConfiguration.configuration();
+            }
+        });
+
+        // Search for lines containing "Ignite".
+        JavaIgniteRDD<Object, String> scanRdd = ignite.fromCache("partitioned");
+
+        JavaRDD<String> processedRdd = scanRdd.filter(new Function<Tuple2<Object, String>, Boolean>() {
+            @Override public Boolean call(Tuple2<Object, String> t) throws Exception {
+                System.out.println("Analyzing line: " + t._2());
+
+                t._2().contains("Ignite");
+
+                return true;
+            }
+        }).map(new Function<Tuple2<Object, String>, String>() {
+            @Override public String call(Tuple2<Object, String> t) throws Exception {
+                return t._2();
+            }
+        });
+
+        // Create a new cache for results.
+        JavaIgniteRDD<Object, String> results = ignite.fromCache("results");
+
+        results.saveValues(processedRdd);
+
+        // SQL query
+        ignite.fromCache("indexed").objectSql("Person", "age > ? and organizationId = ?", 20, 12).collect();
+
+        // SQL fields query
+        DataFrame df = ignite.fromCache("indexed").sql("select name, age from Person where age > ?", 20);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c6d28a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java
new file mode 100644
index 0000000..c1299ef
--- /dev/null
+++ b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.examples.java;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spark.*;
+import org.apache.ignite.spark.examples.*;
+import org.apache.spark.*;
+import org.apache.spark.api.java.*;
+import org.apache.spark.api.java.function.*;
+import org.apache.spark.api.java.function.Function;
+
+import scala.*;
+
+import java.lang.*;
+import java.lang.Boolean;
+
+/**
+ * Ignite store example.
+ */
+public class IgniteStoreExample {
+    /**
+     * @param args Args.
+     */
+    public static void main(String[] args) {
+        SparkConf conf = new SparkConf();
+
+        conf.setAppName("Ignite processing example");
+
+        JavaSparkContext sc = new JavaSparkContext("local[*]", "Ignite processing example", conf);
+
+        JavaIgniteContext<String, String> ignite = new JavaIgniteContext<>(sc, new IgniteOutClosure<IgniteConfiguration>() {
+            @Override public IgniteConfiguration apply() {
+                return ExampleConfiguration.configuration();
+            }
+        });
+
+
+        JavaRDD<String> lines = sc.textFile(args[0]).filter(new Function<String, Boolean>() {
+            @Override public Boolean call(String s) throws Exception {
+                System.out.println("Read line: " + s);
+
+                return s.contains("Ignite");
+            }
+        });
+
+        ignite.fromCache("partitioned").saveValues(lines);
+
+        ignite.fromCache("partitioned").savePairs(lines.mapToPair(new PairFunction<String, String, String>() {
+            @Override public Tuple2<String, String> call(String s) throws Exception {
+                return new Tuple2<>(s, s);
+            }
+        }));
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c6d28a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/package-info.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/package-info.java b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/package-info.java
new file mode 100644
index 0000000..e3243bf
--- /dev/null
+++ b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Demonstrates usage of Ignite and Spark from Java.
+ */
+package org.apache.ignite.spark.examples.java;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c6d28a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index 0b8e845..742d7ee 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -43,9 +43,9 @@ import scala.collection.JavaConversions._
  * @tparam V Value type.
  */
 class IgniteRDD[K, V] (
-    ic: IgniteContext[K, V],
-    cacheName: String,
-    cacheCfg: CacheConfiguration[K, V]
+    val ic: IgniteContext[K, V],
+    val cacheName: String,
+    val cacheCfg: CacheConfiguration[K, V]
 ) extends IgniteAbstractRDD[(K, V), K, V] (ic, cacheName, cacheCfg) {
     /**
      * Computes iterator based on given partition.
@@ -73,7 +73,7 @@ class IgniteRDD[K, V] (
      *
      * @return Partitions.
      */
-    override protected def getPartitions: Array[Partition] = {
+    override protected[spark] def getPartitions: Array[Partition] = {
         ensureCache()
 
         val parts = ic.ignite().affinity(cacheName).partitions()
@@ -87,7 +87,7 @@ class IgniteRDD[K, V] (
      * @param split Split partition.
      * @return
      */
-    override protected def getPreferredLocations(split: Partition): Seq[String] = {
+    override protected[spark] def getPreferredLocations(split: Partition): Seq[String] = {
         ensureCache()
 
         ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c6d28a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
new file mode 100644
index 0000000..6c52e65
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.ignite.Ignite
+import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
+import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.lang.IgniteOutClosure
+import org.apache.spark.api.java.JavaSparkContext
+
+import scala.reflect.ClassTag
+
+class JavaIgniteContext[K, V](
+    @scala.transient val sc: JavaSparkContext,
+    val cfgF: IgniteOutClosure[IgniteConfiguration]) extends Serializable {
+
+    @transient val ic: IgniteContext[K, V] = new IgniteContext[K, V](sc.sc, () => cfgF.apply())
+
+    def this(sc: JavaSparkContext, springUrl: String) {
+        this(sc, new IgniteOutClosure[IgniteConfiguration] {
+            override def apply() = IgnitionEx.loadConfiguration(springUrl).get1()
+        })
+    }
+
+    def fromCache(cacheName: String): JavaIgniteRDD[K, V] =
+        JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheName, null))
+
+    def fromCache(cacheCfg: CacheConfiguration[K, V]) =
+        JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheCfg.getName, cacheCfg))
+
+    def ignite(): Ignite = ic.ignite()
+
+    def close() = ic.close()
+
+    private[spark] def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
+
+    implicit val ktag: ClassTag[K] = fakeClassTag
+
+    implicit val vtag: ClassTag[V] = fakeClassTag
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c6d28a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
new file mode 100644
index 0000000..f2c9c9a
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import java.util
+
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.{Partition, TaskContext}
+
+import scala.annotation.varargs
+import scala.collection.JavaConversions._
+import scala.language.implicitConversions
+import scala.reflect.ClassTag
+
+
+class JavaIgniteRDD[K, V](override val rdd: IgniteRDD[K, V])
+    extends JavaPairRDD[K, V](rdd)(JavaIgniteRDD.fakeClassTag, JavaIgniteRDD.fakeClassTag) {
+    //with JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
+
+    override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
+
+    override val classTag: ClassTag[(K, V)] = JavaIgniteRDD.fakeClassTag
+
+    /**
+     * Computes iterator based on given partition.
+     *
+     * @param part Partition to use.
+     * @param context Task context.
+     * @return Partition iterator.
+     */
+    def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = {
+        rdd.compute(part, context)
+    }
+
+    /**
+     * Gets partitions for the given cache RDD.
+     *
+     * @return Partitions.
+     */
+    protected def getPartitions: java.util.List[Partition] = {
+        new util.ArrayList[Partition](rdd.getPartitions.toSeq)
+    }
+
+    /**
+     * Gets preferred locations for the given partition.
+     *
+     * @param split Split partition.
+     * @return
+     */
+    protected def getPreferredLocations(split: Partition): Seq[String] = {
+        rdd.getPreferredLocations(split)
+    }
+
+    @varargs def objectSql(typeName: String, sql: String, args: Any*): JavaPairRDD[K, V] =
+        JavaPairRDD.fromRDD(rdd.objectSql(typeName, sql, args))
+
+    @varargs def sql(sql: String, args: Any*): DataFrame = rdd.sql(sql, args)
+
+    def saveValues(jrdd: JavaRDD[V]) = rdd.saveValues(JavaRDD.toRDD(jrdd))
+
+    def savePairs(jrdd: JavaPairRDD[K, V]) = {
+        val rrdd: RDD[(K, V)] = JavaPairRDD.toRDD(jrdd)
+
+        rdd.savePairs(rrdd)
+    }
+
+    def clear(): Unit = rdd.clear()
+}
+
+object JavaIgniteRDD {
+    implicit def fromIgniteRDD[K: ClassTag, V: ClassTag](rdd: IgniteRDD[K, V]): JavaIgniteRDD[K, V] =
+        new JavaIgniteRDD[K, V](rdd)
+
+    implicit def toIgniteRDD[K, V](rdd: JavaIgniteRDD[K, V]): IgniteRDD[K, V] = rdd.rdd
+
+    def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c6d28a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
new file mode 100644
index 0000000..13bd3e8
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl
+
+import org.apache.ignite.IgniteCache
+import org.apache.ignite.spark.IgniteRDD
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike}
+
+abstract class JavaIgniteAbstractRDD[K, V](val rdd: IgniteRDD[K, V])
+    extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
+
+    protected def ensureCache(): IgniteCache[K, V] = {
+        // Make sure to deploy the cache
+        if (rdd.cacheCfg != null)
+            rdd.ic.ignite().getOrCreateCache(rdd.cacheCfg)
+        else
+            rdd.ic.ignite().getOrCreateCache(rdd.cacheName)
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c6d28a/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java b/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java
new file mode 100644
index 0000000..d0ad4d4
--- /dev/null
+++ b/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.spark.api.java.*;
+import org.apache.spark.api.java.function.*;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+
+import scala.*;
+
+import java.util.*;
+
+/**
+ * Tests for {@link JavaIgniteRDD}.
+ */
+public class JavaIgniteRDDSelfTest extends GridCommonAbstractTest {
+    /** Grid count. */
+    private static final int GRID_CNT = 3;
+
+    /** Keys count. */
+    private static final int KEYS_CNT = 10000;
+
+    /** Cache name. */
+    private static final String PARTITIONED_CACHE_NAME = "partitioned";
+
+    /** Ip finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Sum function. */
+    private static final Function2<Integer, Integer, Integer> SUM_F = new Function2<Integer, Integer, Integer>() {
+        public Integer call(Integer x, Integer y) {
+            return x + y;
+        }
+    };
+
+    /** To pair function. */
+    private static final PairFunction<Integer, String, String> TO_PAIR_F = new PairFunction<Integer, String, String>() {
+        /** {@inheritDoc} */
+        @Override public Tuple2<String, String> call(Integer i) {
+            return new Tuple2<>(String.valueOf(i), "val" + i);
+        }
+    };
+
+    /** (String, Integer); pair to Integer value function. */
+    private static final Function<Tuple2<String, Integer>, Integer> STR_INT_PAIR_TO_INT_F = new PairToValueFunction<>();
+
+    /** (String, Entity) pair to Entity value function. */
+    private static final Function<Tuple2<String, Entity>, Entity> STR_ENTITY_PAIR_TO_ENTITY_F =
+        new PairToValueFunction<>();
+
+    /** Integer to entity function. */
+    private static final PairFunction<Integer, String, Entity> INT_TO_ENTITY_F =
+        new PairFunction<Integer, String, Entity>() {
+            @Override public Tuple2<String, Entity> call(Integer i) throws Exception {
+                return new Tuple2<>(String.valueOf(i), new Entity(i, "name" + i, i * 100));
+            }
+        };
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        Ignition.ignite("grid-0").cache(PARTITIONED_CACHE_NAME).removeAll();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        Ignition.stop("client", false);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        for (int i = 0; i < GRID_CNT; i++)
+            Ignition.start(getConfiguration("grid-" + i, false));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        for (int i = 0; i < GRID_CNT; i++)
+            Ignition.stop("grid-" + i, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStoreDataToIgnite() throws Exception {
+        JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
+
+        try {
+            JavaIgniteContext<String, String> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider());
+
+            ic.fromCache(PARTITIONED_CACHE_NAME)
+                .savePairs(sc.parallelize(F.range(0, KEYS_CNT), 2).mapToPair(TO_PAIR_F));
+
+            Ignite ignite = Ignition.ignite("grid-0");
+
+            IgniteCache<String, String> cache = ignite.cache(PARTITIONED_CACHE_NAME);
+
+            for (int i = 0; i < KEYS_CNT; i++) {
+                String val = cache.get(String.valueOf(i));
+
+                assertNotNull("Value was not put to cache for key: " + i, val);
+                assertEquals("Invalid value stored for key: " + i, "val" + i, val);
+            }
+        }
+        finally {
+            sc.stop();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReadDataFromIgnite() throws Exception {
+        JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
+
+        try {
+            JavaIgniteContext<String, Integer> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider());
+
+            Ignite ignite = Ignition.ignite("grid-0");
+
+            IgniteCache<String, Integer> cache = ignite.cache(PARTITIONED_CACHE_NAME);
+
+            for (int i = 0; i < KEYS_CNT; i++)
+                cache.put(String.valueOf(i), i);
+
+            JavaRDD<Integer> values = ic.fromCache(PARTITIONED_CACHE_NAME).map(STR_INT_PAIR_TO_INT_F);
+
+            int sum = values.fold(0, SUM_F);
+
+            int expSum = (KEYS_CNT * KEYS_CNT + KEYS_CNT) / 2 - KEYS_CNT;
+
+            assertEquals(expSum, sum);
+        }
+        finally {
+            sc.stop();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueryObjectsFromIgnite() throws Exception {
+        JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
+
+        try {
+            JavaIgniteContext<String, Entity> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider());
+
+            JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME);
+
+            cache.savePairs(sc.parallelize(F.range(0, 1000), 2).mapToPair(INT_TO_ENTITY_F));
+
+            List<Entity> res = cache.objectSql("Entity", "name = ? and salary = ?", "name50", 5000)
+                .map(STR_ENTITY_PAIR_TO_ENTITY_F).collect();
+
+            assertEquals("Invalid result length", 1, res.size());
+        }
+        finally {
+            sc.stop();
+        }
+    }
+
+    /**
+     * @param gridName Grid name.
+     * @param client Client.
+     */
+    private static IgniteConfiguration getConfiguration(String gridName, boolean client) throws Exception {
+        IgniteConfiguration cfg = new IgniteConfiguration();
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        cfg.setCacheConfiguration(cacheConfiguration());
+
+        cfg.setClientMode(client);
+
+        cfg.setGridName(gridName);
+
+        return cfg;
+    }
+
+    /**
+     * Creates cache configuration.
+     */
+    private static CacheConfiguration<Object, Object> cacheConfiguration() {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setBackups(1);
+
+        ccfg.setName(PARTITIONED_CACHE_NAME);
+
+        ccfg.setIndexedTypes(String.class, Entity.class);
+
+        return ccfg;
+    }
+
+    /**
+     * Ignite configiration provider.
+     */
+    static class IgniteConfigProvider implements IgniteOutClosure<IgniteConfiguration> {
+        /** {@inheritDoc} */
+        @Override public IgniteConfiguration apply() {
+            try {
+                return getConfiguration("client", true);
+            }
+            catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    static class PairToValueFunction<K, V> implements Function<Tuple2<K, V>, V> {
+        /** {@inheritDoc} */
+        @Override public V call(Tuple2<K, V> t) throws Exception {
+            return t._2();
+        }
+    };
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c6d28a/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index a514e35..f5b73df 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -321,6 +321,10 @@
                                 <title>Mesos Framework</title>
                                 <packages>org.apache.ignite.mesos*</packages>
                             </group>
+                            <group>
+                                <title>Spark Integration</title>
+                                <packages>org.apache.ignite.spark.examples.java</packages>
+                            </group>
                         </groups>
                         <header>
                             <![CDATA[