You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/09 08:32:55 UTC
[06/24] incubator-ignite git commit: ignite-545: merge from
ignite-sprint-6
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 1bc3409..200da77 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.query.*;
import org.apache.ignite.internal.processors.query.h2.opt.*;
import org.apache.ignite.internal.processors.query.h2.sql.*;
import org.apache.ignite.internal.processors.query.h2.twostep.*;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.offheap.unsafe.*;
@@ -39,6 +40,7 @@ import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.marshaller.*;
import org.apache.ignite.marshaller.jdk.*;
+import org.apache.ignite.plugin.extensions.communication.*;
import org.apache.ignite.resources.*;
import org.apache.ignite.spi.*;
import org.apache.ignite.spi.indexing.*;
@@ -63,6 +65,7 @@ import java.io.*;
import java.lang.reflect.*;
import java.math.*;
import java.sql.*;
+import java.sql.Date;
import java.text.*;
import java.util.*;
import java.util.concurrent.*;
@@ -291,13 +294,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param tblToUpdate Table to update.
* @throws IgniteCheckedException In case of error.
*/
- private void removeKey(@Nullable String spaceName, Object key, TableDescriptor tblToUpdate)
+ private void removeKey(@Nullable String spaceName, CacheObject key, TableDescriptor tblToUpdate)
throws IgniteCheckedException {
try {
Collection<TableDescriptor> tbls = tables(schema(spaceName));
+ Class<?> keyCls = getClass(objectContext(spaceName), key);
+
for (TableDescriptor tbl : tbls) {
- if (tbl != tblToUpdate && tbl.type().keyClass().isAssignableFrom(key.getClass())) {
+ if (tbl != tblToUpdate && tbl.type().keyClass().isAssignableFrom(keyCls)) {
if (tbl.tbl.update(key, null, 0, true)) {
if (tbl.luceneIdx != null)
tbl.luceneIdx.remove(key);
@@ -350,8 +355,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/** {@inheritDoc} */
- @Override public void store(@Nullable String spaceName, GridQueryTypeDescriptor type, Object k, Object v, byte[] ver,
- long expirationTime) throws IgniteCheckedException {
+ @Override public void store(@Nullable String spaceName, GridQueryTypeDescriptor type, CacheObject k, CacheObject v,
+ byte[] ver, long expirationTime) throws IgniteCheckedException {
TableDescriptor tbl = tableDescriptor(spaceName, type);
removeKey(spaceName, k, tbl);
@@ -368,14 +373,63 @@ public class IgniteH2Indexing implements GridQueryIndexing {
tbl.luceneIdx.store(k, v, ver, expirationTime);
}
+ /**
+ * @param o Object.
+ * @return {@code true} If it is a portable object.
+ */
+ private boolean isPortable(CacheObject o) {
+ if (ctx == null)
+ return false;
+
+ return ctx.cacheObjects().isPortableObject(o);
+ }
+
+ /**
+ * @param coctx Cache object context.
+ * @param o Object.
+ * @return Object class.
+ */
+ private Class<?> getClass(CacheObjectContext coctx, CacheObject o) {
+ return isPortable(o) ?
+ Object.class :
+ o.value(coctx, false).getClass();
+ }
+
+ /**
+ * @param space Space.
+ * @return Cache object context.
+ */
+ private CacheObjectContext objectContext(String space) {
+ if (ctx == null)
+ return null;
+
+ return ctx.cache().internalCache(space).context().cacheObjectContext();
+ }
+
+ /**
+ * @param space Space.
+ * @return Cache object context.
+ */
+ private GridCacheContext cacheContext(String space) {
+ if (ctx == null)
+ return null;
+
+ return ctx.cache().internalCache(space).context();
+ }
+
/** {@inheritDoc} */
- @Override public void remove(@Nullable String spaceName, Object key, Object val) throws IgniteCheckedException {
+ @Override public void remove(@Nullable String spaceName, CacheObject key, CacheObject val) throws IgniteCheckedException {
if (log.isDebugEnabled())
- log.debug("Removing key from cache query index [locId=" + nodeId + ", key=" + key + ']');
+ log.debug("Removing key from cache query index [locId=" + nodeId + ", key=" + key + ", val=" + val + ']');
+
+ CacheObjectContext coctx = objectContext(spaceName);
+
+ Class<?> keyCls = getClass(coctx, key);
+ Class<?> valCls = val == null ? null : getClass(coctx, val);
for (TableDescriptor tbl : tables(schema(spaceName))) {
- if (tbl.type().keyClass().isAssignableFrom(key.getClass())
- && (val == null || tbl.type().valueClass().isAssignableFrom(val.getClass()))) {
+ if (tbl.type().keyClass().isAssignableFrom(keyCls)
+ && (val == null || tbl.type().valueClass().isAssignableFrom(valCls))) {
if (tbl.tbl.update(key, val, 0, true)) {
if (tbl.luceneIdx != null)
tbl.luceneIdx.remove(key);
@@ -387,14 +441,16 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/** {@inheritDoc} */
- @Override public void onSwap(@Nullable String spaceName, Object key) throws IgniteCheckedException {
+ @Override public void onSwap(@Nullable String spaceName, CacheObject key) throws IgniteCheckedException {
Schema schema = schemas.get(schema(spaceName));
if (schema == null)
return;
+ Class<?> keyCls = getClass(objectContext(spaceName), key);
+
for (TableDescriptor tbl : schema.tbls.values()) {
- if (tbl.type().keyClass().isAssignableFrom(key.getClass())) {
+ if (tbl.type().keyClass().isAssignableFrom(keyCls)) {
try {
if (tbl.tbl.onSwap(key))
return;
@@ -407,13 +463,18 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/** {@inheritDoc} */
- @Override public void onUnswap(@Nullable String spaceName, Object key, Object val, byte[] valBytes)
+ @Override public void onUnswap(@Nullable String spaceName, CacheObject key, CacheObject val)
throws IgniteCheckedException {
assert val != null;
+ CacheObjectContext coctx = objectContext(spaceName);
+
+ Class<?> keyCls = getClass(coctx, key);
+ Class<?> valCls = getClass(coctx, val);
+
for (TableDescriptor tbl : tables(schema(spaceName))) {
- if (tbl.type().keyClass().isAssignableFrom(key.getClass())
- && tbl.type().valueClass().isAssignableFrom(val.getClass())) {
+ if (tbl.type().keyClass().isAssignableFrom(keyCls)
+ && tbl.type().valueClass().isAssignableFrom(valCls)) {
try {
if (tbl.tbl.onUnswap(key, val))
return;
@@ -514,7 +575,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
meta = meta(rs.getMetaData());
}
catch (SQLException e) {
- throw new IgniteSpiException("Failed to get meta data.", e);
+ throw new IgniteCheckedException("Failed to get meta data.", e);
}
}
@@ -1114,6 +1175,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
log.debug("Starting cache query index...");
System.setProperty("h2.serializeJavaObject", "false");
+ System.setProperty("h2.objectCacheMaxPerElementSize", "0"); // Avoid ValueJavaObject caching.
if (SysProperties.serializeJavaObject) {
U.warn(log, "Serialization of Java objects in H2 was enabled.");
@@ -1259,6 +1321,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
throw new IgniteCheckedException("Cache already registered: " + U.maskName(ccfg.getName()));
createSchema(schema);
+
+ executeStatement(schema, "CREATE ALIAS " + GridSqlQuerySplitter.TABLE_FUNC_NAME +
+ " NOBUFFER FOR \"" + GridReduceQueryExecutor.class.getName() + ".mergeTableFunction\"");
+
createSqlFunctions(schema, ccfg.getSqlFunctionClasses());
}
@@ -1297,6 +1363,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
};
}
+ /** {@inheritDoc} */
+ @Override public MessageFactory messageFactory() {
+ return new GridH2ValueMessageFactory();
+ }
+
/**
* Wrapper to store connection and flag is schema set or not.
*/
@@ -1341,6 +1412,23 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
}
+ /** {@inheritDoc} */
+ @Override public boolean isSqlType(Class<?> cls) {
+ switch (DBTypeEnum.fromClass(cls)) {
+ case OTHER:
+ case ARRAY:
+ return false;
+
+ default:
+ return true;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isGeometryClass(Class<?> cls) {
+ return DataType.isGeometryClass(cls);
+ }
+
/**
* Enum that helps to map java types to database types.
*/
@@ -1551,7 +1639,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (type().valueClass() == String.class) {
try {
- luceneIdx = new GridLuceneIndex(ctx, marshaller, schema.offheap, schema.spaceName, type, true);
+ luceneIdx = new GridLuceneIndex(ctx, schema.offheap, schema.spaceName, type);
}
catch (IgniteCheckedException e1) {
throw new IgniteException(e1);
@@ -1564,7 +1652,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (idx.type() == FULLTEXT) {
try {
- luceneIdx = new GridLuceneIndex(ctx, marshaller, schema.offheap, schema.spaceName, type, true);
+ luceneIdx = new GridLuceneIndex(ctx, schema.offheap, schema.spaceName, type);
}
catch (IgniteCheckedException e1) {
throw new IgniteException(e1);
@@ -1907,7 +1995,75 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/** {@inheritDoc} */
- @Override public GridH2Row createRow(Object key, @Nullable Object val, long expirationTime)
+ @Override public Value wrap(Object obj, int type) throws IgniteCheckedException {
+ assert obj != null;
+
+ if (obj instanceof CacheObject) { // Handle cache object.
+ CacheObject co = (CacheObject)obj;
+
+ if (type == Value.JAVA_OBJECT)
+ return new GridH2ValueCacheObject(cacheContext(schema.spaceName), co);
+
+ obj = co.value(objectContext(schema.spaceName), false);
+ }
+
+ switch (type) {
+ case Value.BOOLEAN:
+ return ValueBoolean.get((Boolean)obj);
+ case Value.BYTE:
+ return ValueByte.get((Byte)obj);
+ case Value.SHORT:
+ return ValueShort.get((Short)obj);
+ case Value.INT:
+ return ValueInt.get((Integer)obj);
+ case Value.FLOAT:
+ return ValueFloat.get((Float)obj);
+ case Value.LONG:
+ return ValueLong.get((Long)obj);
+ case Value.DOUBLE:
+ return ValueDouble.get((Double)obj);
+ case Value.UUID:
+ UUID uuid = (UUID)obj;
+ return ValueUuid.get(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
+ case Value.DATE:
+ return ValueDate.get((Date)obj);
+ case Value.TIME:
+ return ValueTime.get((Time)obj);
+ case Value.TIMESTAMP:
+ if (obj instanceof java.util.Date && !(obj instanceof Timestamp))
+ obj = new Timestamp(((java.util.Date) obj).getTime());
+
+ return GridH2Utils.toValueTimestamp((Timestamp)obj);
+ case Value.DECIMAL:
+ return ValueDecimal.get((BigDecimal)obj);
+ case Value.STRING:
+ return ValueString.get(obj.toString());
+ case Value.BYTES:
+ return ValueBytes.get((byte[])obj);
+ case Value.JAVA_OBJECT:
+ return ValueJavaObject.getNoCopy(obj, null, null);
+ case Value.ARRAY:
+ Object[] arr = (Object[])obj;
+
+ Value[] valArr = new Value[arr.length];
+
+ for (int i = 0; i < arr.length; i++) {
+ Object o = arr[i];
+
+ valArr[i] = o == null ? ValueNull.INSTANCE : wrap(o, DataType.getTypeFromClass(o.getClass()));
+ }
+
+ return ValueArray.get(valArr);
+
+ case Value.GEOMETRY:
+ return ValueGeometry.getFromGeometry(obj);
+ }
+
+ throw new IgniteCheckedException("Failed to wrap value[type=" + type + ", value=" + obj + "]");
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridH2Row createRow(CacheObject key, @Nullable CacheObject val, long expirationTime)
throws IgniteCheckedException {
try {
if (val == null) // Only can happen for remove operation, can create simple search row.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
index 4a0809a..6e95710 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
@@ -20,17 +20,14 @@ package org.apache.ignite.internal.processors.query.h2.opt;
import org.apache.ignite.*;
import org.apache.ignite.internal.processors.query.*;
import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.spi.*;
import org.h2.message.*;
import org.h2.result.*;
import org.h2.value.*;
import org.jetbrains.annotations.*;
import java.lang.ref.*;
-import java.math.*;
-import java.sql.Date;
import java.sql.*;
-import java.util.*;
+import java.util.concurrent.*;
/**
* Table row implementation based on {@link GridQueryTypeDescriptor}.
@@ -61,12 +58,16 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
* @param val Value.
* @param valType Value type.
* @param expirationTime Expiration time.
- * @throws IgniteSpiException If failed.
+ * @throws IgniteCheckedException If failed.
*/
protected GridH2AbstractKeyValueRow(GridH2RowDescriptor desc, Object key, int keyType, @Nullable Object val,
- int valType, long expirationTime) throws IgniteSpiException {
- super(wrap(key, keyType),
- val == null ? null : wrap(val, valType)); // We remove by key only, so value can be null here.
+ int valType, long expirationTime) throws IgniteCheckedException {
+ super(null, null);
+
+ setValue(KEY_COL, desc.wrap(key, keyType));
+
+ if (val != null) // We remove by key only, so value can be null here.
+ setValue(VAL_COL, desc.wrap(val, valType));
this.desc = desc;
this.expirationTime = expirationTime;
@@ -84,72 +85,6 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
}
/**
- * Wraps object to respective {@link Value}.
- *
- * @param obj Object.
- * @param type Value type.
- * @return Value.
- * @throws IgniteSpiException If failed.
- */
- public static Value wrap(Object obj, int type) throws IgniteSpiException {
- assert obj != null;
-
- switch (type) {
- case Value.BOOLEAN:
- return ValueBoolean.get((Boolean)obj);
- case Value.BYTE:
- return ValueByte.get((Byte)obj);
- case Value.SHORT:
- return ValueShort.get((Short)obj);
- case Value.INT:
- return ValueInt.get((Integer)obj);
- case Value.FLOAT:
- return ValueFloat.get((Float)obj);
- case Value.LONG:
- return ValueLong.get((Long)obj);
- case Value.DOUBLE:
- return ValueDouble.get((Double)obj);
- case Value.UUID:
- UUID uuid = (UUID)obj;
- return ValueUuid.get(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
- case Value.DATE:
- return ValueDate.get((Date)obj);
- case Value.TIME:
- return ValueTime.get((Time)obj);
- case Value.TIMESTAMP:
- if (obj instanceof java.util.Date && !(obj instanceof Timestamp))
- obj = new Timestamp(((java.util.Date) obj).getTime());
-
- return GridH2Utils.toValueTimestamp((Timestamp)obj);
- case Value.DECIMAL:
- return ValueDecimal.get((BigDecimal)obj);
- case Value.STRING:
- return ValueString.get(obj.toString());
- case Value.BYTES:
- return ValueBytes.get((byte[])obj);
- case Value.JAVA_OBJECT:
- return ValueJavaObject.getNoCopy(obj, null, null);
- case Value.ARRAY:
- Object[] arr = (Object[])obj;
-
- Value[] valArr = new Value[arr.length];
-
- for (int i = 0; i < arr.length; i++) {
- Object o = arr[i];
-
- valArr[i] = o == null ? ValueNull.INSTANCE : wrap(o, DataType.getTypeFromClass(o.getClass()));
- }
-
- return ValueArray.get(valArr);
-
- case Value.GEOMETRY:
- return ValueGeometry.getFromGeometry(obj);
- }
-
- throw new IgniteSpiException("Failed to wrap value[type=" + type + ", value=" + obj + "]");
- }
-
- /**
* @return Expiration time of respective cache entry.
*/
public long expirationTime() {
@@ -164,7 +99,7 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
/**
* Should be called to remove reference on value.
*
- * @throws IgniteSpiException If failed.
+ * @throws IgniteCheckedException If failed.
*/
public synchronized void onSwap() throws IgniteCheckedException {
setValue(VAL_COL, null);
@@ -178,7 +113,7 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
* @throws IgniteCheckedException If failed.
*/
public synchronized void onUnswap(Object val, boolean beforeRmv) throws IgniteCheckedException {
- setValue(VAL_COL, wrap(val, desc.valueType()));
+ setValue(VAL_COL, desc.wrap(val, desc.valueType()));
notifyAll();
}
@@ -203,20 +138,27 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
}
/**
- * @param attempt Attempt.
+ * @param waitTime Time to await for value unswap.
* @return Synchronized value.
*/
- protected synchronized Value syncValue(int attempt) {
+ protected synchronized Value syncValue(long waitTime) {
Value v = peekValue(VAL_COL);
- if (v == null && attempt != 0) {
+ while (v == null && waitTime > 0) {
+ long start = System.nanoTime(); // This call must be quite rare, so performance is not a concern.
+
try {
- wait(attempt);
+ wait(waitTime); // Wait for value arrival to allow other threads to make a progress.
}
catch (InterruptedException e) {
throw new IgniteInterruptedException(e);
}
+ long t = System.nanoTime() - start;
+
+ if (t > 0)
+ waitTime -= TimeUnit.NANOSECONDS.toMillis(t);
+
v = peekValue(VAL_COL);
}
@@ -258,7 +200,7 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
Object valObj = desc.readFromSwap(k);
if (valObj != null) {
- Value upd = wrap(valObj, desc.valueType());
+ Value upd = desc.wrap(valObj, desc.valueType());
v = updateWeakValue(upd);
@@ -277,7 +219,7 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
if (start == 0)
start = U.currentTimeMillis();
- else if (U.currentTimeMillis() - start > 15_000) // Loop for at most 15 seconds.
+ else if (U.currentTimeMillis() - start > 60_000) // Loop for at most 60 seconds.
throw new IgniteException("Failed to get value for key: " + k +
". This can happen due to a long GC pause.");
}
@@ -317,9 +259,9 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
return ValueNull.INSTANCE;
try {
- return wrap(res, desc.fieldType(col));
+ return desc.wrap(res, desc.fieldType(col));
}
- catch (IgniteSpiException e) {
+ catch (IgniteCheckedException e) {
throw DbException.convert(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
index 9c2c1b2..f89591a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
@@ -21,7 +21,6 @@ import org.apache.ignite.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.offheap.unsafe.*;
import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.spi.*;
import org.h2.store.*;
import org.h2.value.*;
import org.jetbrains.annotations.*;
@@ -87,10 +86,10 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow {
* @param val Value.
* @param valType Value type.
* @param expirationTime Expiration time.
- * @throws IgniteSpiException If failed.
+ * @throws IgniteCheckedException If failed.
*/
public GridH2KeyValueRowOffheap(GridH2RowDescriptor desc, Object key, int keyType, @Nullable Object val, int valType,
- long expirationTime) throws IgniteSpiException {
+ long expirationTime) throws IgniteCheckedException {
super(desc, key, keyType, val, valType, expirationTime);
}
@@ -247,7 +246,7 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow {
Value v = peekValue(VAL_COL);
if (v == null) {
- setValue(VAL_COL, wrap(val, desc.valueType()));
+ setValue(VAL_COL, desc.wrap(val, desc.valueType()));
v = peekValue(VAL_COL);
}
@@ -273,8 +272,8 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow {
}
/** {@inheritDoc} */
- @Override protected Value syncValue(int attempt) {
- Value v = super.syncValue(attempt);
+ @Override protected Value syncValue(long waitTime) {
+ Value v = super.syncValue(waitTime);
if (v != null)
return v;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java
index e998d9b..ee7c79e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java
@@ -18,7 +18,7 @@
package org.apache.ignite.internal.processors.query.h2.opt;
-import org.apache.ignite.spi.*;
+import org.apache.ignite.*;
import org.h2.value.*;
import org.jetbrains.annotations.*;
@@ -35,10 +35,10 @@ public class GridH2KeyValueRowOnheap extends GridH2AbstractKeyValueRow {
* @param val Value.
* @param valType Value type.
* @param expirationTime Expiration time.
- * @throws IgniteSpiException If failed.
+ * @throws IgniteCheckedException If failed.
*/
public GridH2KeyValueRowOnheap(GridH2RowDescriptor desc, Object key, int keyType, @Nullable Object val, int valType,
- long expirationTime) throws IgniteSpiException {
+ long expirationTime) throws IgniteCheckedException {
super(desc, key, keyType, val, valType, expirationTime);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
index c5f9551..cd65ab3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
@@ -18,8 +18,10 @@
package org.apache.ignite.internal.processors.query.h2.opt;
import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.query.h2.*;
import org.apache.ignite.internal.util.offheap.unsafe.*;
+import org.h2.value.*;
import org.jetbrains.annotations.*;
/**
@@ -40,7 +42,7 @@ public interface GridH2RowDescriptor extends GridOffHeapSmartPointerFactory<Grid
* @return Row.
* @throws IgniteCheckedException If failed.
*/
- public GridH2Row createRow(Object key, @Nullable Object val, long expirationTime)
+ public GridH2Row createRow(CacheObject key, @Nullable CacheObject val, long expirationTime)
throws IgniteCheckedException;
/**
@@ -97,4 +99,14 @@ public interface GridH2RowDescriptor extends GridOffHeapSmartPointerFactory<Grid
* @return Guard.
*/
public GridUnsafeGuard guard();
+
+ /**
+ * Wraps object to respective {@link Value}.
+ *
+ * @param o Object.
+ * @param type Value type.
+ * @return Value.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Value wrap(Object o, int type) throws IgniteCheckedException;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 62b3b5e..92991af 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.h2.opt;
import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.util.offheap.unsafe.*;
import org.h2.api.*;
import org.h2.command.ddl.*;
@@ -105,7 +106,7 @@ public class GridH2Table extends TableBase {
* @return {@code true} If row was found.
* @throws IgniteCheckedException If failed.
*/
- public boolean onSwap(Object key) throws IgniteCheckedException {
+ public boolean onSwap(CacheObject key) throws IgniteCheckedException {
return onSwapUnswap(key, null);
}
@@ -117,7 +118,7 @@ public class GridH2Table extends TableBase {
* @return {@code true} If row was found.
* @throws IgniteCheckedException If failed.
*/
- public boolean onUnswap(Object key, Object val) throws IgniteCheckedException {
+ public boolean onUnswap(CacheObject key, CacheObject val) throws IgniteCheckedException {
assert val != null : "Key=" + key;
return onSwapUnswap(key, val);
@@ -132,7 +133,7 @@ public class GridH2Table extends TableBase {
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("LockAcquiredButNotSafelyReleased")
- private boolean onSwapUnswap(Object key, @Nullable Object val) throws IgniteCheckedException {
+ private boolean onSwapUnswap(CacheObject key, @Nullable CacheObject val) throws IgniteCheckedException {
assert key != null;
GridH2TreeIndex pk = pk();
@@ -300,7 +301,8 @@ public class GridH2Table extends TableBase {
* @return {@code true} If operation succeeded.
* @throws IgniteCheckedException If failed.
*/
- public boolean update(Object key, Object val, long expirationTime, boolean rmv) throws IgniteCheckedException {
+ public boolean update(CacheObject key, CacheObject val, long expirationTime, boolean rmv)
+ throws IgniteCheckedException {
assert desc != null;
GridH2Row row = desc.createRow(key, val, expirationTime);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ValueCacheObject.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ValueCacheObject.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ValueCacheObject.java
new file mode 100644
index 0000000..0a37674
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ValueCacheObject.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.*;
+import org.h2.message.*;
+import org.h2.util.*;
+import org.h2.value.*;
+
+import java.sql.*;
+
+/**
+ * H2 Value over {@link CacheObject}. Replacement for {@link ValueJavaObject}.
+ */
+public class GridH2ValueCacheObject extends Value {
+ /** */
+ private CacheObject obj;
+
+ /** */
+ private GridCacheContext<?,?> cctx;
+
+ /**
+ * @param cctx Cache context.
+ * @param obj Object.
+ */
+ public GridH2ValueCacheObject(GridCacheContext<?,?> cctx, CacheObject obj) {
+ assert obj != null;
+
+ this.obj = obj;
+ this.cctx = cctx; // Allowed to be null in tests.
+ }
+
+ /**
+ * @return Cache object.
+ */
+ public CacheObject getCacheObject() {
+ return obj;
+ }
+
+ /**
+ * @return Cache context.
+ */
+ public GridCacheContext<?,?> getCacheContext() {
+ return cctx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getSQL() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getType() {
+ return Value.JAVA_OBJECT;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getPrecision() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getDisplaySize() {
+ return 64;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getString() {
+ return getObject().toString();
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] getBytes() {
+ return Utils.cloneByteArray(getBytesNoCopy());
+ }
+
+ /**
+ * @return Cache object context.
+ */
+ private CacheObjectContext objectContext() {
+ return cctx == null ? null : cctx.cacheObjectContext();
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] getBytesNoCopy() {
+ if (obj.type() == CacheObject.TYPE_REGULAR) {
+ // Result must be the same as `marshaller.marshall(obj.value(coctx, false));`
+ try {
+ return obj.valueBytes(objectContext());
+ }
+ catch (IgniteCheckedException e) {
+ throw DbException.convert(e);
+ }
+ }
+
+ // For portables and byte array cache object types.
+ return Utils.serialize(obj.value(objectContext(), false), null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object getObject() {
+ return obj.value(objectContext(), false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void set(PreparedStatement prep, int parameterIndex) throws SQLException {
+ prep.setObject(parameterIndex, getObject(), Types.JAVA_OBJECT);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected int compareSecure(Value v, CompareMode mode) {
+ Object o1 = getObject();
+ Object o2 = v.getObject();
+
+ boolean o1Comparable = o1 instanceof Comparable;
+ boolean o2Comparable = o2 instanceof Comparable;
+
+ if (o1Comparable && o2Comparable &&
+ Utils.haveCommonComparableSuperclass(o1.getClass(), o2.getClass())) {
+ Comparable<Object> c1 = (Comparable<Object>)o1;
+
+ return c1.compareTo(o2);
+ }
+
+ // Group by types.
+ if (o1.getClass() != o2.getClass()) {
+ if (o1Comparable != o2Comparable)
+ return o1Comparable ? -1 : 1;
+
+ return o1.getClass().getName().compareTo(o2.getClass().getName());
+ }
+
+ // Compare hash codes.
+ int h1 = hashCode();
+ int h2 = v.hashCode();
+
+ if (h1 == h2) {
+ if (o1.equals(o2))
+ return 0;
+
+ return Utils.compareNotNullSigned(getBytesNoCopy(), v.getBytesNoCopy());
+ }
+
+ return h1 > h2 ? 1 : -1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return getObject().hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object other) {
+ if (!(other instanceof Value))
+ return false;
+
+ Value otherVal = (Value)other;
+
+ return otherVal.getType() == Value.JAVA_OBJECT
+ && getObject().equals(otherVal.getObject());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Value convertPrecision(long precision, boolean force) {
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getMemory() {
+ return 0;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
index c6d597f..f2f11be 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
@@ -19,13 +19,14 @@ package org.apache.ignite.internal.processors.query.h2.opt;
import org.apache.ignite.*;
import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.query.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.offheap.unsafe.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
-import org.apache.ignite.marshaller.*;
import org.apache.ignite.spi.indexing.*;
import org.apache.lucene.analysis.standard.*;
import org.apache.lucene.document.*;
@@ -33,6 +34,7 @@ import org.apache.lucene.index.*;
import org.apache.lucene.queryParser.*;
import org.apache.lucene.search.*;
import org.apache.lucene.util.*;
+import org.h2.util.*;
import org.jetbrains.annotations.*;
import java.io.*;
@@ -55,9 +57,6 @@ public class GridLuceneIndex implements Closeable {
public static final String EXPIRATION_TIME_FIELD_NAME = "_gg_expires__";
/** */
- private final Marshaller marshaller;
-
- /** */
private final String spaceName;
/** */
@@ -70,9 +69,6 @@ public class GridLuceneIndex implements Closeable {
private final String[] idxdFields;
/** */
- private final boolean storeVal;
-
- /** */
private final AtomicLong updateCntr = new GridAtomicLong();
/** */
@@ -85,20 +81,16 @@ public class GridLuceneIndex implements Closeable {
* Constructor.
*
* @param ctx Kernal context.
- * @param marshaller Indexing marshaller.
* @param mem Unsafe memory.
* @param spaceName Space name.
* @param type Type descriptor.
- * @param storeVal Store value in index.
* @throws IgniteCheckedException If failed.
*/
- public GridLuceneIndex(GridKernalContext ctx, Marshaller marshaller, @Nullable GridUnsafeMemory mem,
- @Nullable String spaceName, GridQueryTypeDescriptor type, boolean storeVal) throws IgniteCheckedException {
+ public GridLuceneIndex(GridKernalContext ctx, @Nullable GridUnsafeMemory mem,
+ @Nullable String spaceName, GridQueryTypeDescriptor type) throws IgniteCheckedException {
this.ctx = ctx;
- this.marshaller = marshaller;
this.spaceName = spaceName;
this.type = type;
- this.storeVal = storeVal;
dir = new GridLuceneDirectory(mem == null ? new GridUnsafeMemory(0) : mem);
@@ -137,15 +129,30 @@ public class GridLuceneIndex implements Closeable {
}
/**
+ * @return Cache object context.
+ */
+ private CacheObjectContext objectContext() {
+ if (ctx == null)
+ return null;
+
+ return ctx.cache().internalCache(spaceName).context().cacheObjectContext();
+ }
+
+ /**
* Stores given data in this fulltext index.
*
- * @param key Key.
- * @param val Value.
+ * @param k Key.
+ * @param v Value.
* @param ver Version.
* @param expires Expiration time.
* @throws IgniteCheckedException If failed.
*/
- public void store(Object key, Object val, byte[] ver, long expires) throws IgniteCheckedException {
+ public void store(CacheObject k, CacheObject v, byte[] ver, long expires) throws IgniteCheckedException {
+ CacheObjectContext coctx = objectContext();
+
+ Object key = k.value(coctx, false);
+ Object val = v.value(coctx, false);
+
Document doc = new Document();
boolean stringsFound = false;
@@ -166,7 +173,7 @@ public class GridLuceneIndex implements Closeable {
}
}
- String keyStr = org.apache.commons.codec.binary.Base64.encodeBase64String(marshaller.marshal(key));
+ String keyStr = org.apache.commons.codec.binary.Base64.encodeBase64String(k.valueBytes(coctx));
try {
// Delete first to avoid duplicates.
@@ -177,8 +184,8 @@ public class GridLuceneIndex implements Closeable {
doc.add(new Field(KEY_FIELD_NAME, keyStr, Field.Store.YES, Field.Index.NOT_ANALYZED));
- if (storeVal && type.valueClass() != String.class)
- doc.add(new Field(VAL_FIELD_NAME, marshaller.marshal(val)));
+ if (type.valueClass() != String.class)
+ doc.add(new Field(VAL_FIELD_NAME, v.valueBytes(coctx)));
doc.add(new Field(VER_FIELD_NAME, ver));
@@ -201,9 +208,10 @@ public class GridLuceneIndex implements Closeable {
* @param key Key.
* @throws IgniteCheckedException If failed.
*/
- public void remove(Object key) throws IgniteCheckedException {
+ public void remove(CacheObject key) throws IgniteCheckedException {
try {
- writer.deleteDocuments(new Term(KEY_FIELD_NAME, org.apache.commons.codec.binary.Base64.encodeBase64String(marshaller.marshal(key))));
+ writer.deleteDocuments(new Term(KEY_FIELD_NAME,
+ org.apache.commons.codec.binary.Base64.encodeBase64String(key.valueBytes(objectContext()))));
}
catch (IOException e) {
throw new IgniteCheckedException(e);
@@ -297,6 +305,9 @@ public class GridLuceneIndex implements Closeable {
/** */
private IgniteBiTuple<K, V> curr;
+ /** */
+ private CacheObjectContext coctx;
+
/**
* Constructor.
*
@@ -313,6 +324,8 @@ public class GridLuceneIndex implements Closeable {
this.docs = docs;
this.filters = filters;
+ coctx = objectContext();
+
findNext();
}
@@ -328,10 +341,25 @@ public class GridLuceneIndex implements Closeable {
}
/**
+ * @param bytes Bytes.
+ * @param ldr Class loader.
+ * @return Object.
+ * @throws IgniteCheckedException If failed.
+ */
+ @SuppressWarnings("unchecked")
+ private <Z> Z unmarshall(byte[] bytes, ClassLoader ldr) throws IgniteCheckedException {
+ if (coctx == null) // For tests.
+ return (Z)Utils.deserialize(bytes, null);
+
+ return (Z)coctx.processor().unmarshal(coctx, bytes, ldr);
+ }
+
+ /**
* Finds next element.
*
* @throws IgniteCheckedException If failed.
*/
+ @SuppressWarnings("unchecked")
private void findNext() throws IgniteCheckedException {
curr = null;
@@ -345,26 +373,22 @@ public class GridLuceneIndex implements Closeable {
throw new IgniteCheckedException(e);
}
- String keyStr = doc.get(KEY_FIELD_NAME);
-
ClassLoader ldr = null;
if (ctx != null && ctx.deploy().enabled())
ldr = ctx.cache().internalCache(spaceName).context().deploy().globalLoader();
- K k = marshaller.unmarshal(org.apache.commons.codec.binary.Base64.decodeBase64(keyStr), ldr);
+ K k = unmarshall(org.apache.commons.codec.binary.Base64.decodeBase64(doc.get(KEY_FIELD_NAME)), ldr);
- byte[] valBytes = doc.getBinaryValue(VAL_FIELD_NAME);
+ V v = type.valueClass() == String.class ?
+ (V)doc.get(VAL_STR_FIELD_NAME) :
+ this.<V>unmarshall(doc.getBinaryValue(VAL_FIELD_NAME), ldr);
- V v = valBytes != null ? marshaller.<V>unmarshal(valBytes, ldr) :
- type.valueClass() == String.class ?
- (V)doc.get(VAL_STR_FIELD_NAME): null;
+ assert v != null;
if (!filter(k, v))
continue;
-// byte[] ver = doc.getBinaryValue(VER_FIELD_NAME); TODO rm version
-
curr = new IgniteBiTuple<>(k, v);
break;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperationType.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperationType.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperationType.java
index 7aefbec..a071e73 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperationType.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperationType.java
@@ -131,7 +131,7 @@ public enum GridSqlOperationType {
@Override public String getSql(GridSqlOperation operation) {
assert operation.opType().childrenCnt == 2;
- return "(INTERSECTS(" + operation.child(0) + ", " + operation.child(1) + "))";
+ return "(INTERSECTS(" + operation.child(0).getSQL() + ", " + operation.child(1).getSQL() + "))";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java
index c40632f..207588e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java
@@ -37,6 +37,26 @@ public abstract class GridSqlQuery implements Cloneable {
/** */
protected GridSqlElement limit;
+ /** */
+ private boolean explain;
+
+ /**
+ * @param explain Explain.
+ * @return {@code this}.
+ */
+ public GridSqlQuery explain(boolean explain) {
+ this.explain = explain;
+
+ return this;
+ }
+
+ /**
+ * @return {@code true} If explain.
+ */
+ public boolean explain() {
+ return explain;
+ }
+
/**
* @return Offset.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
index 62ec72f..74e4748 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
@@ -158,6 +158,9 @@ public class GridSqlQueryParser {
private static final Getter<SelectUnion, SortOrder> UNION_SORT = getter(SelectUnion.class, "sort");
/** */
+ private static final Getter<Explain, Prepared> EXPLAIN_COMMAND = getter(Explain.class, "command");
+
+ /** */
private static volatile Getter<Command,Prepared> prepared;
/** */
@@ -182,7 +185,7 @@ public class GridSqlQueryParser {
Prepared statement = p.get(cmd);
- return new GridSqlQueryParser().parse((Query)statement);
+ return new GridSqlQueryParser().parse(statement);
}
/**
@@ -321,13 +324,16 @@ public class GridSqlQueryParser {
/**
* @param qry Select.
*/
- public GridSqlQuery parse(Query qry) {
+ public GridSqlQuery parse(Prepared qry) {
if (qry instanceof Select)
return parse((Select)qry);
if (qry instanceof SelectUnion)
return parse((SelectUnion)qry);
+ if (qry instanceof Explain)
+ return parse(EXPLAIN_COMMAND.get((Explain)qry)).explain(true);
+
throw new UnsupportedOperationException("Unknown query type: " + qry);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 73d7751..b1d8913 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -78,7 +78,7 @@ public class GridSqlQuerySplitter {
if (params == null)
params = GridCacheSqlQuery.EMPTY_PARAMS;
- GridSqlQuery qry0 = GridSqlQueryParser.parse(stmt);
+ final GridSqlQuery qry0 = GridSqlQueryParser.parse(stmt);
GridSqlSelect srcQry;
@@ -87,6 +87,8 @@ public class GridSqlQuerySplitter {
else { // Handle UNION.
srcQry = new GridSqlSelect().from(new GridSqlSubquery(qry0));
+ srcQry.explain(qry0.explain());
+
GridSqlSelect left = leftest(qry0);
int c = 0;
@@ -127,7 +129,10 @@ public class GridSqlQuerySplitter {
// Create map and reduce queries.
GridSqlSelect mapQry = srcQry.clone();
- GridSqlSelect rdcQry = new GridSqlSelect().from(new GridSqlFunction("PUBLIC", TABLE_FUNC_NAME)); // table(mergeTable)); TODO
+
+ mapQry.explain(false);
+
+ GridSqlSelect rdcQry = new GridSqlSelect().from(new GridSqlFunction(null, TABLE_FUNC_NAME)); // table(mergeTable)); TODO
// Split all select expressions into map-reduce parts.
List<GridSqlElement> mapExps = F.addAll(
@@ -213,6 +218,8 @@ public class GridSqlQuerySplitter {
res.addMapQuery(mergeTable, mapQry.getSQL(),
findParams(mapQry, params, new ArrayList<>(params.length)).toArray());
+ res.explain(qry0.explain());
+
return res;
}
@@ -281,6 +288,10 @@ public class GridSqlQuerySplitter {
while (target.size() < idx)
target.add(null);
+ if (params.length <= idx)
+ throw new IgniteException("Invalid number of query parameters. " +
+ "Cannot find " + idx + " parameter.");
+
Object param = params[idx];
if (idx == target.size())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSelect.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSelect.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSelect.java
index 0786eac..9972bba 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSelect.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSelect.java
@@ -68,7 +68,7 @@ public class GridSqlSelect extends GridSqlQuery {
/** {@inheritDoc} */
@Override public String getSQL() {
- StatementBuilder buff = new StatementBuilder("SELECT");
+ StatementBuilder buff = new StatementBuilder(explain() ? "EXPLAIN SELECT" : "SELECT");
if (distinct)
buff.append(" DISTINCT");
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlUnion.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlUnion.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlUnion.java
index 23c18ed..96beb6b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlUnion.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlUnion.java
@@ -47,7 +47,7 @@ public class GridSqlUnion extends GridSqlQuery {
/** {@inheritDoc} */
@Override public String getSQL() {
- StatementBuilder buff = new StatementBuilder();
+ StatementBuilder buff = new StatementBuilder(explain() ? "EXPLAIN \n" : "");
buff.append('(').append(left.getSQL()).append(')');
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 918a541..f15a2da 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -30,6 +30,8 @@ import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.plugin.extensions.communication.*;
import org.h2.jdbc.*;
import org.h2.result.*;
import org.h2.store.*;
@@ -44,6 +46,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.*;
/**
* Map query executor.
@@ -138,6 +141,9 @@ public class GridMapQueryExecutor {
ClusterNode node = ctx.discovery().node(nodeId);
+ if (node == null)
+ return; // Node left, ignore.
+
boolean processed = true;
if (msg instanceof GridQueryRequest)
@@ -203,7 +209,14 @@ public class GridMapQueryExecutor {
Collection<GridCacheSqlQuery> qrys;
try {
- qrys = req.queries(ctx.config().getMarshaller());
+ qrys = req.queries();
+
+ if (!node.isLocal()) {
+ Marshaller m = ctx.config().getMarshaller();
+
+ for (GridCacheSqlQuery qry : qrys)
+ qry.unmarshallParams(m);
+ }
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -344,7 +357,10 @@ public class GridMapQueryExecutor {
boolean loc = node.isLocal();
GridQueryNextPageResponse msg = new GridQueryNextPageResponse(qr.qryReqId, qry, page,
- page == 0 ? res.rowCount : -1, loc ? null : marshallRows(rows), loc ? rows : null);
+ page == 0 ? res.rowCount : -1 ,
+ res.cols,
+ loc ? null : toMessages(rows, new ArrayList<Message>(res.cols)),
+ loc ? rows : null);
if (loc)
h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
@@ -510,6 +526,9 @@ public class GridMapQueryExecutor {
private final UUID qrySrcNodeId;
/** */
+ private final int cols;
+
+ /** */
private int page;
/** */
@@ -538,6 +557,7 @@ public class GridMapQueryExecutor {
}
rowCount = res.getRowCount();
+ cols = res.getVisibleColumnCount();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index 407341e..9136821 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -96,14 +96,14 @@ public abstract class GridMergeIndex extends BaseIndex {
* @param nodeId Node ID.
*/
public void fail(UUID nodeId) {
- addPage0(new GridResultPage(nodeId, null, false));
+ addPage0(new GridResultPage(null, nodeId, null, false));
}
/**
* @param page Page.
*/
public final void addPage(GridResultPage page) {
- int pageRowsCnt = page.rows().size();
+ int pageRowsCnt = page.rowsInPage();
if (pageRowsCnt != 0)
addPage0(page);
@@ -137,7 +137,7 @@ public abstract class GridMergeIndex extends BaseIndex {
if (last)
last = lastSubmitted.compareAndSet(false, true);
- addPage0(new GridResultPage(page.source(), null, last));
+ addPage0(new GridResultPage(null, page.source(), null, last));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
index 93c9482..76a52e9 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
@@ -44,7 +44,7 @@ public class GridMergeIndexUnsorted extends GridMergeIndex {
/** {@inheritDoc} */
@Override protected void addPage0(GridResultPage page) {
- if (!page.rows().isEmpty() || page.isLast() || queue.isEmpty())
+ if (page.rowsInPage() != 0 || page.isLast() || queue.isEmpty())
queue.add(page);
}
@@ -75,7 +75,7 @@ public class GridMergeIndexUnsorted extends GridMergeIndex {
fetchNextPage(page);
- iter = page.rows().iterator();
+ iter = page.rows();
}
return true;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 7f42e0d..50c30a5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -28,12 +28,13 @@ import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.query.*;
import org.apache.ignite.internal.processors.query.*;
import org.apache.ignite.internal.processors.query.h2.*;
-import org.apache.ignite.internal.processors.query.h2.sql.*;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.marshaller.*;
import org.apache.ignite.plugin.extensions.communication.*;
+import org.h2.command.*;
import org.h2.command.ddl.*;
import org.h2.command.dml.Query;
import org.h2.engine.*;
@@ -151,9 +152,6 @@ public class GridReduceQueryExecutor {
}
}
}, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
-
- h2.executeStatement("PUBLIC", "CREATE ALIAS " + GridSqlQuerySplitter.TABLE_FUNC_NAME +
- " NOBUFFER FOR \"" + GridReduceQueryExecutor.class.getName() + ".mergeTableFunction\"");
}
/**
@@ -166,6 +164,9 @@ public class GridReduceQueryExecutor {
ClusterNode node = ctx.discovery().node(nodeId);
+ if (node == null)
+ return; // Node left, ignore.
+
boolean processed = true;
if (msg instanceof GridQueryNextPageResponse)
@@ -230,7 +231,7 @@ public class GridReduceQueryExecutor {
GridResultPage page;
try {
- page = new GridResultPage(node.id(), msg, false) {
+ page = new GridResultPage(ctx, node.id(), msg, false) {
@Override public void fetchNextPage() {
if (r.rmtErr != null)
throw new CacheException("Next page fetch failed.", r.rmtErr);
@@ -279,15 +280,16 @@ public class GridReduceQueryExecutor {
String space = cctx.name();
- r.conn = h2.connectionForSpace(space);
+ r.conn = (JdbcConnection)h2.connectionForSpace(space);
- // TODO Add topology version.
+ // TODO Add topology version.
ClusterGroup dataNodes = ctx.grid().cluster().forDataNodes(space);
- if (cctx.isReplicated()) {
- assert dataNodes.node(ctx.localNodeId()) == null : "We must be on a client node.";
+ if (cctx.isReplicated() || qry.explain()) {
+ assert qry.explain() || dataNodes.node(ctx.localNodeId()) == null : "We must be on a client node.";
- dataNodes = dataNodes.forRandom(); // Select random data node to run query on a replicated data.
+ // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
+ dataNodes = dataNodes.forRandom();
}
final Collection<ClusterNode> nodes = dataNodes.nodes();
@@ -296,7 +298,7 @@ public class GridReduceQueryExecutor {
GridMergeTable tbl;
try {
- tbl = createFunctionTable((JdbcConnection)r.conn, mapQry); // createTable(r.conn, mapQry); TODO
+ tbl = createFunctionTable(r.conn, mapQry, qry.explain()); // createTable(r.conn, mapQry); TODO
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -317,14 +319,32 @@ public class GridReduceQueryExecutor {
runs.put(qryReqId, r);
try {
- send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, qry.mapQueries(),
- ctx.config().getMarshaller().marshal(qry.mapQueries())));
+ Collection<GridCacheSqlQuery> mapQrys = qry.mapQueries();
+
+ if (qry.explain()) {
+ mapQrys = new ArrayList<>(qry.mapQueries().size());
+
+ for (GridCacheSqlQuery mapQry : qry.mapQueries())
+ mapQrys.add(new GridCacheSqlQuery(mapQry.alias(), "EXPLAIN " + mapQry.query(), mapQry.parameters()));
+ }
+
+ if (nodes.size() != 1 || !F.first(nodes).isLocal()) { // Marshall params for remotes.
+ Marshaller m = ctx.config().getMarshaller();
+
+ for (GridCacheSqlQuery mapQry : mapQrys)
+ mapQry.marshallParams(m);
+ }
+
+ send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys));
r.latch.await();
if (r.rmtErr != null)
throw new CacheException("Failed to run map query remotely.", r.rmtErr);
+ if (qry.explain())
+ return explainPlan(r.conn, space, qry);
+
GridCacheSqlQuery rdc = qry.reduceQuery();
final ResultSet res = h2.executeSqlQueryWithTimer(space, r.conn, rdc.query(), F.asList(rdc.parameters()));
@@ -355,6 +375,55 @@ public class GridReduceQueryExecutor {
}
/**
+ * @param c Connection.
+ * @param space Space.
+ * @param qry Query.
+ * @return Cursor for plans.
+ * @throws IgniteCheckedException if failed.
+ */
+ private QueryCursor<List<?>> explainPlan(JdbcConnection c, String space, GridCacheTwoStepQuery qry)
+ throws IgniteCheckedException {
+ List<List<?>> lists = new ArrayList<>();
+
+ for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
+ ResultSet rs = h2.executeSqlQueryWithTimer(space, c, "SELECT PLAN FROM " + mapQry.alias(), null);
+
+ lists.add(F.asList(getPlan(rs)));
+ }
+
+ for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
+ GridMergeTable tbl = createFunctionTable(c, mapQry, false);
+
+ curFunTbl.set(tbl); // Now it will be only a single table.
+ }
+
+ GridCacheSqlQuery rdc = qry.reduceQuery();
+
+ ResultSet rs = h2.executeSqlQueryWithTimer(space, c, "EXPLAIN " + rdc.query(), F.asList(rdc.parameters()));
+
+ lists.add(F.asList(getPlan(rs)));
+
+ return new QueryCursorImpl<>(lists.iterator());
+ }
+
+ /**
+ * @param rs Result set.
+ * @return Plan.
+ * @throws IgniteCheckedException If failed.
+ */
+ private String getPlan(ResultSet rs) throws IgniteCheckedException {
+ try {
+ if (!rs.next())
+ throw new IllegalStateException();
+
+ return rs.getString(1);
+ }
+ catch (SQLException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ /**
* @param nodes Nodes.
* @param msg Message.
* @throws IgniteCheckedException If failed.
@@ -495,10 +564,12 @@ public class GridReduceQueryExecutor {
/**
* @param conn Connection.
* @param qry Query.
+ * @param explain Explain.
* @return Table.
* @throws IgniteCheckedException
*/
- private GridMergeTable createFunctionTable(JdbcConnection conn, GridCacheSqlQuery qry) throws IgniteCheckedException {
+ private GridMergeTable createFunctionTable(JdbcConnection conn, GridCacheSqlQuery qry, boolean explain)
+ throws IgniteCheckedException {
try {
Session ses = (Session)conn.getSession();
@@ -508,17 +579,21 @@ public class GridReduceQueryExecutor {
data.schema = ses.getDatabase().getSchema(ses.getCurrentSchemaName());
data.create = true;
- Query prepare = (Query)ses.prepare(qry.query(), false);
+ if (!explain) {
+ Prepared prepare = ses.prepare(qry.query(), false);
- List<org.h2.expression.Parameter> parsedParams = prepare.getParameters();
+ List<org.h2.expression.Parameter> parsedParams = prepare.getParameters();
- for (int i = Math.min(parsedParams.size(), qry.parameters().length); --i >= 0; ) {
- Object val = qry.parameters()[i];
+ for (int i = Math.min(parsedParams.size(), qry.parameters().length); --i >= 0; ) {
+ Object val = qry.parameters()[i];
- parsedParams.get(i).setValue(DataType.convertToValue(ses, val, Value.UNKNOWN));
- }
+ parsedParams.get(i).setValue(DataType.convertToValue(ses, val, Value.UNKNOWN));
+ }
- data.columns = generateColumnsFromQuery(prepare);
+ data.columns = generateColumnsFromQuery((Query)prepare);
+ }
+ else
+ data.columns = planColumns();
return new GridMergeTable(data);
}
@@ -530,6 +605,17 @@ public class GridReduceQueryExecutor {
}
/**
+ * @return Columns.
+ */
+ private static ArrayList<Column> planColumns() {
+ ArrayList<Column> res = new ArrayList<>(1);
+
+ res.add(new Column("PLAN", Value.STRING));
+
+ return res;
+ }
+
+ /**
* @param conn Connection.
* @param qry Query.
* @return Table.
@@ -566,7 +652,7 @@ public class GridReduceQueryExecutor {
private CountDownLatch latch;
/** */
- private Connection conn;
+ private JdbcConnection conn;
/** */
private int pageSize;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
index 5f58b95..9392fd1 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
@@ -17,12 +17,18 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
import org.h2.value.*;
+import javax.cache.*;
import java.util.*;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.*;
+
/**
* Page result.
*/
@@ -34,18 +40,22 @@ public class GridResultPage {
protected final GridQueryNextPageResponse res;
/** */
- private final Collection<Value[]> rows;
+ private final int rowsInPage;
/** */
private final boolean last;
+ /** */
+ private Iterator<Value[]> rows;
+
/**
+ * @param ctx Kernal context.
* @param src Source.
* @param res Response.
* @param last If this is the globally last page.
*/
@SuppressWarnings("unchecked")
- public GridResultPage(UUID src, GridQueryNextPageResponse res, boolean last) {
+ public GridResultPage(final GridKernalContext ctx, UUID src, GridQueryNextPageResponse res, boolean last) {
assert src != null;
this.src = src;
@@ -57,12 +67,53 @@ public class GridResultPage {
// res == null means that it is a terminating dummy page for the given source node ID.
if (res != null) {
- Object plainRows = res.plainRows();
+ Collection<?> plainRows = res.plainRows();
+
+ if (plainRows != null) {
+ rowsInPage = plainRows.size();
+
+ rows = (Iterator<Value[]>)plainRows.iterator();
+ }
+ else {
+ final int cols = res.columns();
+
+ rowsInPage = res.values().size() / cols;
+
+ final Iterator<Message> valsIter = res.values().iterator();
+
+ rows = new Iterator<Value[]>() {
+ /** */
+ int rowIdx;
+
+ @Override public boolean hasNext() {
+ return rowIdx < rowsInPage;
+ }
+
+ @Override public Value[] next() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ rowIdx++;
- rows = plainRows != null ? (Collection<Value[]>)plainRows : GridMapQueryExecutor.unmarshallRows(res.rows());
+ try {
+ return fillArray(valsIter, new Value[cols], ctx);
+ }
+ catch (IgniteCheckedException e) {
+ throw new CacheException(e);
+ }
+ }
+
+ @Override public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ }
+ else {
+ rowsInPage = 0;
+
+ rows = Collections.emptyIterator();
}
- else
- rows = Collections.emptySet();
}
/**
@@ -73,10 +124,23 @@ public class GridResultPage {
}
/**
+ * @return Number on rows in this page.
+ */
+ public int rowsInPage() {
+ return rowsInPage;
+ }
+
+ /**
* @return Rows.
*/
- public Collection<Value[]> rows() {
- return rows;
+ public Iterator<Value[]> rows() {
+ Iterator<Value[]> r = rows;
+
+ assert r != null;
+
+ rows = null;
+
+ return r;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Array.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Array.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Array.java
new file mode 100644
index 0000000..26fd81d
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Array.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep.msg;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.h2.value.*;
+
+import java.nio.*;
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.*;
+
+/**
+ * H2 Array.
+ */
+public class GridH2Array extends GridH2ValueMessage {
+ /** */
+ @GridDirectCollection(Message.class)
+ private Collection<Message> x;
+
+ /**
+ *
+ */
+ public GridH2Array() {
+ // No-op.
+ }
+
+ /**
+ * @param val Value.
+ * @throws IgniteCheckedException If failed.
+ */
+ public GridH2Array(Value val) throws IgniteCheckedException {
+ assert val.getType() == Value.ARRAY : val.getType();
+
+ ValueArray arr = (ValueArray)val;
+
+ x = new ArrayList<>(arr.getList().length);
+
+ for (Value v : arr.getList())
+ x.add(toMessage(v));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Value value(GridKernalContext ctx) throws IgniteCheckedException {
+ return ValueArray.get(fillArray(x.iterator(), new Value[x.size()], ctx));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeCollection("x", x, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ x = reader.readCollection("x", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return -18;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java
new file mode 100644
index 0000000..ec4e455
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep.msg;
+
+import org.apache.ignite.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.h2.value.*;
+
+import java.nio.*;
+
+/**
+ * H2 Boolean.
+ */
+public class GridH2Boolean extends GridH2ValueMessage {
+ /** */
+ private boolean x;
+
+ /**
+ *
+ */
+ public GridH2Boolean() {
+ // No-op.
+ }
+
+ /**
+ * @param val Value.
+ */
+ public GridH2Boolean(Value val) {
+ assert val.getType() == Value.BOOLEAN : val.getType();
+
+ x = val.getBoolean();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Value value(GridKernalContext ctx) {
+ return ValueBoolean.get(x);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeBoolean("x", x))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ x = reader.readBoolean("x");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return -5;
+ }
+
+ @Override public byte fieldsCount() {
+ return 1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Byte.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Byte.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Byte.java
new file mode 100644
index 0000000..e7c8f33
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Byte.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep.msg;
+
+import org.apache.ignite.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.h2.value.*;
+
+import java.nio.*;
+
+/**
+ * H2 Byte.
+ */
+public class GridH2Byte extends GridH2ValueMessage {
+ /** */
+ private byte x;
+
+ /**
+ *
+ */
+ public GridH2Byte() {
+ // No-op.
+ }
+
+ /**
+ * @param val Value.
+ */
+ public GridH2Byte(Value val) {
+ assert val.getType() == Value.BYTE : val.getType();
+
+ x = val.getByte();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Value value(GridKernalContext ctx) {
+ return ValueByte.get(x);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeByte("x", x))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ x = reader.readByte("x");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return -6;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 1;
+ }
+}