You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/06/03 04:34:59 UTC
incubator-ignite git commit: IGNITE-389 - Merge branch
ignite-sprint-5 into ignite-389
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-389 d0157d4ef -> 2aa1ace0c
IGNITE-389 - Merge branch ignite-sprint-5 into ignite-389
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2aa1ace0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2aa1ace0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2aa1ace0
Branch: refs/heads/ignite-389
Commit: 2aa1ace0cdbf0fbbbcd5893958bddb7869742ce0
Parents: d0157d4
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Tue Jun 2 19:34:49 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Tue Jun 2 19:34:49 2015 -0700
----------------------------------------------------------------------
.../processors/cache/IgniteCacheProxy.java | 8 +-
.../processors/cache/QueryCursorImpl.java | 23 ++--
.../processors/cache/query/QueryCursorEx.java | 8 ++
.../processors/query/GridQueryIndexing.java | 2 +-
.../processors/query/GridQueryProcessor.java | 13 ++-
...niteDynamicCacheWithConfigStartSelfTest.java | 108 +++++++++++++++++++
.../processors/query/h2/IgniteH2Indexing.java | 43 +++++---
.../h2/twostep/GridReduceQueryExecutor.java | 8 +-
.../cache/GridCacheCrossCacheQuerySelfTest.java | 12 ++-
.../cache/IgniteCacheAbstractQuerySelfTest.java | 23 ++++
modules/spark/pom.xml | 18 ++--
.../org/apache/ignite/spark/IgniteContext.scala | 3 +
.../org/apache/ignite/spark/IgniteRDD.scala | 68 ++++++++++--
.../spark/examples/IgniteProcessExample.scala | 2 +-
.../org/apache/ignite/spark/IgniteRddSpec.scala | 38 +++----
15 files changed, 291 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 176543b..b3914e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -497,10 +497,14 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return (QueryCursor<R>)queryContinuous((ContinuousQuery<K, V>)qry, qry.isLocal());
if (qry instanceof SqlQuery) {
- SqlQuery p = (SqlQuery)qry;
+ final SqlQuery p = (SqlQuery)qry;
if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal())
- return (QueryCursor<R>)new QueryCursorImpl<>(ctx.kernalContext().query().<K, V>queryLocal(ctx, p));
+ return (QueryCursor<R>)new QueryCursorImpl<>(new Iterable<Cache.Entry<K, V>>() {
+ @Override public Iterator<Cache.Entry<K, V>> iterator() {
+ return ctx.kernalContext().query().<K, V>queryLocal(ctx, p);
+ }
+ });
return (QueryCursor<R>)ctx.kernalContext().query().queryTwoStep(ctx, p);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
index 7cb9efc..d68c377 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
@@ -27,6 +27,9 @@ import java.util.*;
* Query cursor implementation.
*/
public class QueryCursorImpl<T> implements QueryCursorEx<T> {
+ /** Query executor. */
+ private Iterable<T> iterExec;
+
/** */
private Iterator<T> iter;
@@ -34,18 +37,18 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> {
private boolean iterTaken;
/** */
- private Collection<GridQueryFieldMetadata> fieldsMeta;
+ private List<GridQueryFieldMetadata> fieldsMeta;
/**
- * @param iter Iterator.
+ * @param iterExec Query executor.
*/
- public QueryCursorImpl(Iterator<T> iter) {
- this.iter = iter;
+ public QueryCursorImpl(Iterable<T> iterExec) {
+ this.iterExec = iterExec;
}
/** {@inheritDoc} */
@Override public Iterator<T> iterator() {
- if (iter == null)
+ if (iter == null && iterTaken)
throw new IgniteException("Cursor is closed.");
if (iterTaken)
@@ -53,12 +56,16 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> {
iterTaken = true;
+ iter = iterExec.iterator();
+
+ assert iter != null;
+
return iter;
}
/** {@inheritDoc} */
@Override public List<T> getAll() {
- ArrayList<T> all = new ArrayList<>();
+ List<T> all = new ArrayList<>();
try {
for (T t : this) // Implicitly calls iterator() to do all checks.
@@ -103,14 +110,14 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> {
/**
* @param fieldsMeta SQL Fields query result metadata.
*/
- public void fieldsMeta(Collection<GridQueryFieldMetadata> fieldsMeta) {
+ public void fieldsMeta(List<GridQueryFieldMetadata> fieldsMeta) {
this.fieldsMeta = fieldsMeta;
}
/**
* @return SQL Fields query result metadata.
*/
- public Collection<GridQueryFieldMetadata> fieldsMeta() {
+ @Override public List<GridQueryFieldMetadata> fieldsMeta() {
return fieldsMeta;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java
index bf1d4ea..5e19b99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java
@@ -19,6 +19,9 @@ package org.apache.ignite.internal.processors.cache.query;
import org.apache.ignite.*;
import org.apache.ignite.cache.query.*;
+import org.apache.ignite.internal.processors.query.*;
+
+import java.util.*;
/**
* Extended query cursor interface allowing for "getAll" to output data into destination other than Collection.
@@ -32,6 +35,11 @@ public interface QueryCursorEx<T> extends QueryCursor<T> {
public void getAll(Consumer<T> c) throws IgniteCheckedException;
/**
+ * @return Query metadata.
+ */
+ public List<GridQueryFieldMetadata> fieldsMeta();
+
+ /**
* Query value consumer.
*/
public static interface Consumer<T> {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 0bb820d..cc0916a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -60,7 +60,7 @@ public interface GridQueryIndexing {
* @param qry Query.
* @return Cursor.
*/
- public QueryCursor<List<?>> queryTwoStep(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry);
+ public Iterable<List<?>> queryTwoStep(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry);
/**
* Parses SQL query into two step query and executes it.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index cd4d543..31337ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -532,7 +532,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @param qry Query.
* @return Cursor.
*/
- public QueryCursor<List<?>> queryTwoStep(String space, GridCacheTwoStepQuery qry) {
+ public Iterable<List<?>> queryTwoStep(String space, GridCacheTwoStepQuery qry) {
checkxEnabled();
if (!busyLock.enterBusy())
@@ -670,7 +670,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @param qry Query.
* @return Iterator.
*/
- public QueryCursor<List<?>> queryLocalFields(GridCacheContext<?,?> cctx, SqlFieldsQuery qry) {
+ public QueryCursor<List<?>> queryLocalFields(final GridCacheContext<?,?> cctx, SqlFieldsQuery qry) {
if (!busyLock.enterBusy())
throw new IllegalStateException("Failed to execute query (grid is stopping).");
@@ -679,7 +679,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
String sql = qry.getSql();
Object[] args = qry.getArgs();
- GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter());
+ final GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter());
if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
ctx.event().record(new CacheQueryExecutedEvent<>(
@@ -697,8 +697,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
null));
}
- QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(
- new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepPortable()));
+ QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() {
+ @Override public Iterator<List<?>> iterator() {
+ return new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepPortable());
+ }
+ });
cursor.fieldsMeta(res.metaData());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
new file mode 100644
index 0000000..704cf26
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+public class IgniteDynamicCacheWithConfigStartSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final String CACHE_NAME = "partitioned";
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ if (!client)
+ cfg.setCacheConfiguration(cacheConfiguration(gridName));
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @return Cache configuration.
+ */
+ protected CacheConfiguration cacheConfiguration(String cacheName) {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(CACHE_NAME);
+
+ ccfg.setIndexedTypes(String.class, String.class);
+
+ return ccfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartCacheOnClient() throws Exception {
+ int srvCnt = 3;
+
+ startGrids(srvCnt);
+
+ try {
+ client = true;
+
+ int clientCnt = 12;
+
+ IgniteEx[] clients = new IgniteEx[clientCnt];
+
+ for (int i = 0; i < clients.length; i++)
+ clients[i] = startGrid(i + srvCnt);
+
+ final AtomicInteger idx = new AtomicInteger();
+
+ GridTestUtils.runMultiThreaded(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ final int idx0 = idx.getAndIncrement();
+
+ ignite(idx0).cache(CACHE_NAME).get(1);
+
+ return null;
+ }
+ }, clients.length, "runner");
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 200da77..6ec329f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -592,7 +592,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @throws SQLException If failed.
*/
private static List<GridQueryFieldMetadata> meta(ResultSetMetaData rsMeta) throws SQLException {
- ArrayList<GridQueryFieldMetadata> meta = new ArrayList<>(rsMeta.getColumnCount());
+ List<GridQueryFieldMetadata> meta = new ArrayList<>(rsMeta.getColumnCount());
for (int i = 1; i <= rsMeta.getColumnCount(); i++) {
String schemaName = rsMeta.getSchemaName(i);
@@ -771,8 +771,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/** {@inheritDoc} */
- @Override public QueryCursor<List<?>> queryTwoStep(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry) {
- return rdcQryExec.query(cctx, qry);
+ @Override public Iterable<List<?>> queryTwoStep(final GridCacheContext<?,?> cctx, final GridCacheTwoStepQuery qry) {
+ return new Iterable<List<?>>() {
+ @Override public Iterator<List<?>> iterator() {
+ return rdcQryExec.query(cctx, qry);
+ }
+ };
}
/** {@inheritDoc} */
@@ -802,25 +806,30 @@ public class IgniteH2Indexing implements GridQueryIndexing {
final QueryCursor<List<?>> res = queryTwoStep(cctx, fqry);
- final Iterator<List<?>> iter0 = res.iterator();
+ final Iterable<Cache.Entry<K, V>> converted = new Iterable<Cache.Entry<K, V>>() {
+ @Override public Iterator<Cache.Entry<K, V>> iterator() {
+ final Iterator<List<?>> iter0 = res.iterator();
- Iterator<Cache.Entry<K,V>> iter = new Iterator<Cache.Entry<K,V>>() {
- @Override public boolean hasNext() {
- return iter0.hasNext();
- }
+ return new Iterator<Cache.Entry<K,V>>() {
+ @Override public boolean hasNext() {
+ return iter0.hasNext();
+ }
- @Override public Cache.Entry<K,V> next() {
- List<?> l = iter0.next();
+ @Override public Cache.Entry<K,V> next() {
+ List<?> l = iter0.next();
- return new CacheEntryImpl<>((K)l.get(0),(V)l.get(1));
- }
+ return new CacheEntryImpl<>((K)l.get(0),(V)l.get(1));
+ }
- @Override public void remove() {
- throw new UnsupportedOperationException();
+ @Override public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
}
};
- return new QueryCursorImpl<Cache.Entry<K,V>>(iter) {
+ // No metadata for SQL queries.
+ return new QueryCursorImpl<Cache.Entry<K,V>>(converted) {
@Override public void close() {
res.close();
}
@@ -844,7 +853,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
GridCacheTwoStepQuery twoStepQry;
- Collection<GridQueryFieldMetadata> meta;
+ List<GridQueryFieldMetadata> meta;
try {
twoStepQry = GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, qry.getArgs(), qry.isCollocated());
@@ -863,7 +872,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
twoStepQry.pageSize(qry.getPageSize());
- QueryCursorImpl<List<?>> cursor = (QueryCursorImpl<List<?>>)queryTwoStep(cctx, twoStepQry);
+ QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(queryTwoStep(cctx, twoStepQry));
cursor.fieldsMeta(meta);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 50c30a5..cfacfcf 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -269,7 +269,7 @@ public class GridReduceQueryExecutor {
* @param qry Query.
* @return Cursor.
*/
- public QueryCursor<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry) {
+ public Iterator<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry) {
long qryReqId = reqIdGen.incrementAndGet();
QueryRun r = new QueryRun();
@@ -356,7 +356,7 @@ public class GridReduceQueryExecutor {
// dropTable(r.conn, tbl.getName()); TODO
}
- return new QueryCursorImpl<>(new GridQueryCacheObjectsIterator(new Iter(res), cctx, cctx.keepPortable()));
+ return new GridQueryCacheObjectsIterator(new Iter(res), cctx, cctx.keepPortable());
}
catch (IgniteCheckedException | InterruptedException | RuntimeException e) {
U.closeQuiet(r.conn);
@@ -381,7 +381,7 @@ public class GridReduceQueryExecutor {
* @return Cursor for plans.
* @throws IgniteCheckedException if failed.
*/
- private QueryCursor<List<?>> explainPlan(JdbcConnection c, String space, GridCacheTwoStepQuery qry)
+ private Iterator<List<?>> explainPlan(JdbcConnection c, String space, GridCacheTwoStepQuery qry)
throws IgniteCheckedException {
List<List<?>> lists = new ArrayList<>();
@@ -403,7 +403,7 @@ public class GridReduceQueryExecutor {
lists.add(F.asList(getPlan(rs)));
- return new QueryCursorImpl<>(lists.iterator());
+ return lists.iterator();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
index 4e9bf31..dd7c879 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
@@ -127,9 +127,17 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
q.addMapQuery("_cnts_", "select count(*) x from \"partitioned\".FactPurchase where ? = ?", 2, 2);
- Object cnt = qryProc.queryTwoStep(cache, q).getAll().iterator().next().get(0);
+ Iterator<List<?>> it = qryProc.queryTwoStep(cache, q).iterator();
- assertEquals(10L, cnt);
+ try {
+ Object cnt = it.next().get(0);
+
+ assertEquals(10L, cnt);
+ }
+ finally {
+ if (it instanceof AutoCloseable)
+ ((AutoCloseable)it).close();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
index fa62361..0d45711 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.cache.distributed.replicated.*;
import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.query.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
@@ -987,6 +988,28 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
/**
* @throws Exception If failed.
*/
+ public void testFieldsQueryMetadata() throws Exception {
+ IgniteCache<UUID, Person> cache = ignite.cache(null);
+
+ for (int i = 0; i < 100; i++)
+ cache.put(UUID.randomUUID(), new Person("name-" + i, (i + 1) * 100));
+
+ QueryCursor<List<?>> cur = cache.query(new SqlFieldsQuery("select name, salary from Person where name like ?")
+ .setArgs("name-"));
+
+ assertTrue(cur instanceof QueryCursorEx);
+
+ QueryCursorEx<List<?>> curEx = (QueryCursorEx<List<?>>)cur;
+
+ List<GridQueryFieldMetadata> meta = curEx.fieldsMeta();
+
+ assertNotNull(meta);
+ assertEquals(2, meta.size());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
private void checkSqlQueryEvents() throws Exception {
final CountDownLatch execLatch = new CountDownLatch(cacheMode() == REPLICATED ? 1 : gridCount());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/spark/pom.xml
----------------------------------------------------------------------
diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml
index 84055d6..a4a25f5 100644
--- a/modules/spark/pom.xml
+++ b/modules/spark/pom.xml
@@ -31,7 +31,7 @@
</parent>
<artifactId>ignite-spark</artifactId>
- <version>1.0.7-SNAPSHOT</version>
+ <version>1.1.1-SNAPSHOT</version>
<dependencies>
<dependency>
@@ -58,16 +58,12 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.3.1</version>
- <exclusions>
- <exclusion>
- <groupId>com.twitter</groupId>
- <artifactId>chill_2.11</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.twitter</groupId>
- <artifactId>chill-java</artifactId>
- </exclusion>
- </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_2.10</artifactId>
+ <version>1.3.1</version>
</dependency>
<!-- Test dependencies -->
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
index 6259665..5cdbad0 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.IgnitionEx
import org.apache.ignite.{Ignition, Ignite}
import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
/**
* Ignite context.
@@ -42,6 +43,8 @@ class IgniteContext[K, V](
this(sc, () ⇒ IgnitionEx.loadConfiguration(springUrl).get1())
}
+ val sqlContext = new SQLContext(sparkContext)
+
def fromCache(cacheName: String): IgniteRDD[K, V] = {
new IgniteRDD[K, V](this, cacheName, null)
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index f286b58..0b8e845 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -18,14 +18,18 @@ package org.apache.ignite.spark
import javax.cache.Cache
-import org.apache.ignite.cache.query.{SqlFieldsQuery, SqlQuery, ScanQuery}
+import org.apache.ignite.cache.query._
import org.apache.ignite.cluster.ClusterNode
import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.internal.processors.cache.query.QueryCursorEx
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata
import org.apache.ignite.lang.IgniteUuid
-import org.apache.ignite.spark.impl.{IgniteAbstractRDD, IgniteSqlRDD, IgnitePartition, IgniteQueryIterator}
+import org.apache.ignite.spark.impl._
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode
import org.apache.spark.rdd.RDD
-import org.apache.spark.{TaskContext, Partition}
+import org.apache.spark.sql.types._
+import org.apache.spark.sql._
+import org.apache.spark._
import scala.collection.JavaConversions._
@@ -98,12 +102,16 @@ class IgniteRDD[K, V] (
new IgniteSqlRDD[(K, V), Cache.Entry[K, V], K, V](ic, cacheName, cacheCfg, qry, entry ⇒ (entry.getKey, entry.getValue))
}
- def sql(sql: String, args: Any*): RDD[Seq[Any]] = {
+ def sql(sql: String, args: Any*): DataFrame = {
val qry = new SqlFieldsQuery(sql)
qry.setArgs(args.map(_.asInstanceOf[Object]):_*)
- new IgniteSqlRDD[Seq[Any], java.util.List[_], K, V](ic, cacheName, cacheCfg, qry, list ⇒ list)
+ val schema = buildSchema(ensureCache().query(qry).asInstanceOf[QueryCursorEx[java.util.List[_]]].fieldsMeta())
+
+ val rowRdd = new IgniteSqlRDD[Row, java.util.List[_], K, V](ic, cacheName, cacheCfg, qry, list ⇒ Row.fromSeq(list))
+
+ ic.sqlContext.createDataFrame(rowRdd, schema)
}
def saveValues(rdd: RDD[V]) = {
@@ -138,10 +146,6 @@ class IgniteRDD[K, V] (
// Make sure to deploy the cache
ensureCache()
- val locNode = ig.cluster().localNode()
-
- val node: Option[ClusterNode] = ig.cluster().forHost(locNode).nodes().find(!_.eq(locNode))
-
val streamer = ig.dataStreamer[K, V](cacheName)
try {
@@ -159,7 +163,49 @@ class IgniteRDD[K, V] (
ensureCache().removeAll()
}
- private def affinityKeyFunc(value: V, node: ClusterNode): Object = {
- IgniteUuid.randomUuid()
+ /**
+ * Builds spark schema from query metadata.
+ *
+ * @param fieldsMeta Fields metadata.
+ * @return Spark schema.
+ */
+ private def buildSchema(fieldsMeta: java.util.List[GridQueryFieldMetadata]): StructType = {
+ new StructType(fieldsMeta.map(i ⇒ new StructField(i.fieldName(), dataType(i.fieldTypeName()), nullable = true))
+ .toArray)
+ }
+
+ /**
+ * Gets Spark data type based on type name.
+ *
+ * @param typeName Type name.
+ * @return Spark data type.
+ */
+ private def dataType(typeName: String): DataType = typeName match {
+ case "java.lang.Boolean" ⇒ BooleanType
+ case "java.lang.Byte" ⇒ ByteType
+ case "java.lang.Short" ⇒ ShortType
+ case "java.lang.Integer" ⇒ IntegerType
+ case "java.lang.Long" ⇒ LongType
+ case "java.lang.Float" ⇒ FloatType
+ case "java.lang.Double" ⇒ DoubleType
+ case "java.lang.String" ⇒ StringType
+ case "java.util.Date" ⇒ DateType
+ case "java.sql.Timestamp" ⇒ TimestampType
+ case "[B" ⇒ BinaryType
+
+ case _ ⇒ StructType(new Array[StructField](0)) // TODO Do we need to fill user types?
+ }
+
+ /**
+ * Generates affinity key for given cluster node.
+ *
+ * @param value Value to generate key for.
+ * @param node Node to generate key for.
+ * @return Affinity key.
+ */
+ private def affinityKeyFunc(value: V, node: ClusterNode): IgniteUuid = {
+ val aff = ic.ignite().affinity[IgniteUuid](cacheName)
+
+ Stream.continually(IgniteUuid.randomUuid()).find(node == null || aff.mapKeyToNode(_).eq(node)).get
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala
index db8b5a3..ab91c62 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala
@@ -47,6 +47,6 @@ object IgniteProcessExample {
ignite.fromCache("indexed").objectSql("Person", "age > ? and organizationId = ?", 20, 12).collect()
// SQL fields query
- val sqlRes: RDD[Seq[Any]] = ignite.fromCache("indexed").sql("select name, age from Person where age > ?", 20)
+ val df = ignite.fromCache("indexed").sql("select name, age from Person where age > ?", 20)
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala
index 68273da..26ce693 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala
@@ -117,39 +117,29 @@ class IgniteRddSpec extends FunSpec with Matchers with BeforeAndAfterAll with Be
val cache: IgniteRDD[String, Entity] = ic.fromCache(PARTITIONED_CACHE_NAME)
+ import ic.sqlContext.implicits._
+
cache.savePairs(sc.parallelize(0 to 1000, 2).map(i ⇒ (String.valueOf(i), new Entity(i, "name" + i, i * 100))))
- val res = cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000).collect()
+ val df = cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000)
+
+ df.printSchema()
+
+ val res = df.collect()
assert(res.length == 1, "Invalid result length")
- assert(50 == res(0).head, "Invalid result")
+ assert(50 == res(0)(0), "Invalid result")
assert("name50" == res(0)(1), "Invalid result")
assert(5000 == res(0)(2), "Invalid result")
- assert(500 == cache.sql("select id from Entity where id > 500").count(), "Invalid count")
- }
- finally {
- sc.stop()
- }
- }
-
- it("should successfully store values RDD") {
- val sc = new SparkContext("local[*]", "test")
-
- try {
- val ic = new IgniteContext[String, Entity](sc,
- () ⇒ configuration("client", client = true))
+ val df0 = cache.sql("select id, name, salary from Entity").where('NAME === "name50" and 'SALARY === 5000)
- val cache: IgniteRDD[String, Entity] = ic.fromCache(PARTITIONED_CACHE_NAME)
+ val res0 = df0.collect()
- cache.savePairs(sc.parallelize(0 to 1000, 2).map(i ⇒ (String.valueOf(i), new Entity(i, "name" + i, i * 100))))
-
- val res = cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000).collect()
-
- assert(res.length == 1, "Invalid result length")
- assert(50 == res(0).head, "Invalid result")
- assert("name50" == res(0)(1), "Invalid result")
- assert(5000 == res(0)(2), "Invalid result")
+ assert(res0.length == 1, "Invalid result length")
+ assert(50 == res0(0)(0), "Invalid result")
+ assert("name50" == res0(0)(1), "Invalid result")
+ assert(5000 == res0(0)(2), "Invalid result")
assert(500 == cache.sql("select id from Entity where id > 500").count(), "Invalid count")
}