You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/12/05 12:38:11 UTC
[07/52] ignite git commit: IGNITE-2294: Implemented DML.
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
index e15e770..b80f573 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
@@ -23,12 +23,13 @@ import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryArrayIdentityResolver;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryIdentityResolver;
import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.internal.binary.builder.BinaryObjectBuilderImpl;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -50,7 +51,7 @@ public abstract class BinaryObjectExImpl implements BinaryObjectEx {
/**
* @return {@code True} if object is array based.
*/
- protected abstract boolean hasArray();
+ public abstract boolean hasArray();
/**
* @return Object array if object is array based, otherwise {@code null}.
@@ -81,22 +82,29 @@ public abstract class BinaryObjectExImpl implements BinaryObjectEx {
*
* @return Field value.
*/
- @Nullable protected abstract int dataStartOffset();
+ public abstract int dataStartOffset();
/**
* Get offset of the footer begin.
*
* @return Field value.
*/
- @Nullable protected abstract int footerStartOffset();
+ public abstract int footerStartOffset();
/**
* Get field by offset.
*
- * @param order Field order.
+ * @param order Field offset.
* @return Field value.
*/
- @Nullable protected abstract <F> F fieldByOrder(int order);
+ @Nullable public abstract <F> F fieldByOrder(int order);
+
+ /**
+ * Create field comparer.
+ *
+ * @return Comparer.
+ */
+ public abstract BinarySerializedFieldComparator createFieldComparator();
/**
* @param ctx Reader context.
@@ -106,18 +114,30 @@ public abstract class BinaryObjectExImpl implements BinaryObjectEx {
@Nullable protected abstract <F> F field(BinaryReaderHandles ctx, String fieldName);
/**
+ * @return {@code True} if object has schema.
+ */
+ public abstract boolean hasSchema();
+
+ /**
* Get schema ID.
*
* @return Schema ID.
*/
- protected abstract int schemaId();
+ public abstract int schemaId();
/**
* Create schema for object.
*
* @return Schema.
*/
- protected abstract BinarySchema createSchema();
+ public abstract BinarySchema createSchema();
+
+ /**
+ * Get binary context.
+ *
+ * @return Binary context.
+ */
+ public abstract BinaryContext context();
/** {@inheritDoc} */
@Override public BinaryObjectBuilder toBuilder() throws BinaryObjectException {
@@ -134,58 +154,15 @@ public abstract class BinaryObjectExImpl implements BinaryObjectEx {
if (other == this)
return true;
- if (other == null)
- return false;
-
- if (!(other instanceof BinaryObjectExImpl))
- return false;
-
- BinaryObjectExImpl other0 = (BinaryObjectExImpl)other;
-
- if (typeId() != other0.typeId())
- return false;
-
- int start = dataStartOffset();
- int end = footerStartOffset();
-
- int otherStart = other0.dataStartOffset();
- int otherEnd = other0.footerStartOffset();
-
- int len = end - start;
-
- if (len != otherEnd - otherStart)
+ if (!(other instanceof BinaryObject))
return false;
- if (hasArray()) {
- byte[] arr = array();
+ BinaryIdentityResolver identity = context().identity(typeId());
- if (other0.hasArray()) {
- byte[] otherArr = other0.array();
+ if (identity == null)
+ identity = BinaryArrayIdentityResolver.instance();
- for (int i = start, j = otherStart; i < end; i++, j++) {
- if (arr[i] != otherArr[j])
- return false;
- }
-
- return true;
- }
- else {
- assert other0.offheapAddress() > 0;
-
- return GridUnsafeMemory.compare(other0.offheapAddress() + otherStart, arr, start, len);
- }
- }
- else {
- assert offheapAddress() > 0;
-
- if (other0.hasArray())
- return GridUnsafeMemory.compare(offheapAddress() + start, other0.array(), otherStart, len);
- else {
- assert other0.offheapAddress() > 0;
-
- return GridUnsafeMemory.compare(offheapAddress() + start, other0.offheapAddress() + otherStart, len);
- }
- }
+ return identity.equals(this, (BinaryObject)other);
}
/** {@inheritDoc} */
@@ -250,6 +227,7 @@ public abstract class BinaryObjectExImpl implements BinaryObjectEx {
* @param ctx Reader context.
* @param handles Handles for already traversed objects.
*/
+ @SuppressWarnings("unchecked")
private void appendValue(Object val, SB buf, BinaryReaderHandles ctx,
IdentityHashMap<BinaryObject, Integer> handles) {
if (val instanceof byte[])
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
index 54a7c08..360c71a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
@@ -211,10 +211,8 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
this.detachAllowed = detachAllowed;
}
- /**
- * @return Context.
- */
- public BinaryContext context() {
+ /** {@inheritDoc} */
+ @Override public BinaryContext context() {
return ctx;
}
@@ -241,7 +239,7 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
}
/** {@inheritDoc} */
- @Override protected boolean hasArray() {
+ @Override public boolean hasArray() {
return true;
}
@@ -296,20 +294,34 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
}
/** {@inheritDoc} */
- @Nullable @Override protected int dataStartOffset() {
+ @Override public BinarySerializedFieldComparator createFieldComparator() {
+ int schemaOff = BinaryPrimitives.readInt(arr, start + GridBinaryMarshaller.SCHEMA_OR_RAW_OFF_POS);
+
+ short flags = BinaryPrimitives.readShort(arr, start + GridBinaryMarshaller.FLAGS_POS);
+
+ int fieldIdLen = BinaryUtils.isCompactFooter(flags) ? 0 : BinaryUtils.FIELD_ID_LEN;
+ int fieldOffLen = BinaryUtils.fieldOffsetLength(flags);
+
+ int orderBase = start + schemaOff + fieldIdLen;
+ int orderMultiplier = fieldIdLen + fieldOffLen;
+
+ return new BinarySerializedFieldComparator(this, arr, 0L, start, orderBase, orderMultiplier, fieldOffLen);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int dataStartOffset() {
int typeId = BinaryPrimitives.readInt(arr, start + GridBinaryMarshaller.TYPE_ID_POS);
if (typeId == GridBinaryMarshaller.UNREGISTERED_TYPE_ID) {
int len = BinaryPrimitives.readInt(arr, start + GridBinaryMarshaller.DFLT_HDR_LEN + 1);
return start + GridBinaryMarshaller.DFLT_HDR_LEN + len + 5;
- }
- else
+ } else
return start + GridBinaryMarshaller.DFLT_HDR_LEN;
}
/** {@inheritDoc} */
- @Nullable @Override protected int footerStartOffset() {
+ @Override public int footerStartOffset() {
short flags = BinaryPrimitives.readShort(arr, start + GridBinaryMarshaller.FLAGS_POS);
if (!BinaryUtils.hasSchema(flags))
@@ -318,9 +330,12 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
return start + BinaryPrimitives.readInt(arr, start + GridBinaryMarshaller.SCHEMA_OR_RAW_OFF_POS);
}
- /** {@inheritDoc} */
+ /** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Nullable @Override protected <F> F fieldByOrder(int order) {
+ @Nullable @Override public <F> F fieldByOrder(int order) {
+ if (order == BinarySchema.ORDER_NOT_FOUND)
+ return null;
+
Object val;
// Calculate field position.
@@ -490,12 +505,19 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
}
/** {@inheritDoc} */
- @Override protected int schemaId() {
+ @Override public boolean hasSchema() {
+ short flags = BinaryPrimitives.readShort(arr, start + GridBinaryMarshaller.FLAGS_POS);
+
+ return BinaryUtils.hasSchema(flags);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int schemaId() {
return BinaryPrimitives.readInt(arr, start + GridBinaryMarshaller.SCHEMA_ID_POS);
}
/** {@inheritDoc} */
- @Override protected BinarySchema createSchema() {
+ @Override public BinarySchema createSchema() {
return reader(null, false).getOrCreateSchema();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
index 7550b19..354ac11 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
@@ -96,7 +96,7 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter
if (typeId == GridBinaryMarshaller.UNREGISTERED_TYPE_ID) {
int off = start + GridBinaryMarshaller.DFLT_HDR_LEN;
- String clsName = BinaryUtils.doReadClassName(new BinaryOffheapInputStream(ptr + off, size));
+ String clsName = BinaryUtils.doReadClassName(new BinaryOffheapInputStream(off, size));
typeId = ctx.typeId(clsName);
}
@@ -115,16 +115,28 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter
}
/** {@inheritDoc} */
- @Override protected int schemaId() {
+ @Override public boolean hasSchema() {
+ short flags = BinaryPrimitives.readShort(ptr, start + GridBinaryMarshaller.FLAGS_POS);
+
+ return BinaryUtils.hasSchema(flags);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int schemaId() {
return BinaryPrimitives.readInt(ptr, start + GridBinaryMarshaller.SCHEMA_ID_POS);
}
/** {@inheritDoc} */
- @Override protected BinarySchema createSchema() {
+ @Override public BinarySchema createSchema() {
return reader(null, false).getOrCreateSchema();
}
/** {@inheritDoc} */
+ @Override public BinaryContext context() {
+ return ctx;
+ }
+
+ /** {@inheritDoc} */
@Override public int start() {
return start;
}
@@ -140,7 +152,7 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter
}
/** {@inheritDoc} */
- @Override protected boolean hasArray() {
+ @Override public boolean hasArray() {
return false;
}
@@ -174,7 +186,22 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter
}
/** {@inheritDoc} */
- @Nullable @Override protected int dataStartOffset() {
+ @Override public BinarySerializedFieldComparator createFieldComparator() {
+ int schemaOff = BinaryPrimitives.readInt(ptr, start + GridBinaryMarshaller.SCHEMA_OR_RAW_OFF_POS);
+
+ short flags = BinaryPrimitives.readShort(ptr, start + GridBinaryMarshaller.FLAGS_POS);
+
+ int fieldIdLen = BinaryUtils.isCompactFooter(flags) ? 0 : BinaryUtils.FIELD_ID_LEN;
+ int fieldOffLen = BinaryUtils.fieldOffsetLength(flags);
+
+ int orderBase = start + schemaOff + fieldIdLen;
+ int orderMultiplier = fieldIdLen + fieldOffLen;
+
+ return new BinarySerializedFieldComparator(this, null, ptr, start, orderBase, orderMultiplier, fieldOffLen);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int dataStartOffset() {
int typeId = BinaryPrimitives.readInt(ptr, start + GridBinaryMarshaller.TYPE_ID_POS);
if (typeId == GridBinaryMarshaller.UNREGISTERED_TYPE_ID) {
@@ -186,7 +213,7 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter
}
/** {@inheritDoc} */
- @Nullable @Override protected int footerStartOffset() {
+ @Override public int footerStartOffset() {
short flags = BinaryPrimitives.readShort(ptr, start + GridBinaryMarshaller.FLAGS_POS);
if (!BinaryUtils.hasSchema(flags))
@@ -197,7 +224,10 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Nullable @Override protected <F> F fieldByOrder(int order) {
+ @Nullable @Override public <F> F fieldByOrder(int order) {
+ if (order == BinarySchema.ORDER_NOT_FOUND)
+ return null;
+
Object val;
// Calculate field position.
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryPrimitives.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryPrimitives.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryPrimitives.java
index 8b82fad..86f5040 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryPrimitives.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryPrimitives.java
@@ -120,6 +120,18 @@ public abstract class BinaryPrimitives {
}
/**
+ * @param ptr Pointer.
+ * @param off Offset.
+ * @param val Value.
+ */
+ public static void writeShort(long ptr, int off, short val) {
+ if (BIG_ENDIAN)
+ GridUnsafe.putShortLE(ptr + off, val);
+ else
+ GridUnsafe.putShort(ptr + off, val);
+ }
+
+ /**
* @param arr Array.
* @param off Offset.
* @return Value.
@@ -228,6 +240,18 @@ public abstract class BinaryPrimitives {
}
/**
+ * @param ptr Pointer.
+ * @param off Offset.
+ * @param val Value.
+ */
+ public static void writeInt(long ptr, int off, int val) {
+ if (BIG_ENDIAN)
+ GridUnsafe.putIntLE(ptr + off, val);
+ else
+ GridUnsafe.putInt(ptr + off, val);
+ }
+
+ /**
* @param arr Array.
* @param off Offset.
* @return Value.
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/BinarySerializedFieldComparator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinarySerializedFieldComparator.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinarySerializedFieldComparator.java
new file mode 100644
index 0000000..130bb0c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinarySerializedFieldComparator.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.binary;
+
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Arrays;
+
+/**
+ * Compares fiels in serialized form when possible.
+ */
+public class BinarySerializedFieldComparator {
+ /** Position: not found. */
+ private static final int POS_NOT_FOUND = -1;
+
+ /** Original object. */
+ private final BinaryObjectExImpl obj;
+
+ /** Pointer to data (onheap). */
+ private final byte[] arr;
+
+ /** Pointer to data (offheap). */
+ private final long ptr;
+
+ /** Object start offset. */
+ private final int startOff;
+
+ /** Order base. */
+ private final int orderBase;
+
+ /** Order multiplier. */
+ private final int orderMultiplier;
+
+ /** Field offset length. */
+ private final int fieldOffLen;
+
+ /** Current field order. */
+ private int curFieldOrder;
+
+ /** Current field offset. */
+ private int curFieldPos;
+
+ /**
+ * Constructor.
+ *
+ * @param obj Original object.
+ * @param arr Array.
+ * @param ptr Pointer.
+ * @param startOff Start offset.
+ * @param orderBase Order base.
+ * @param orderMultiplier Order multiplier.
+ * @param fieldOffLen Field offset length.
+ */
+ public BinarySerializedFieldComparator(BinaryObjectExImpl obj, byte[] arr, long ptr, int startOff, int orderBase,
+ int orderMultiplier, int fieldOffLen) {
+ assert arr != null && ptr == 0L || arr == null && ptr != 0L;
+
+ this.obj = obj;
+ this.arr = arr;
+ this.ptr = ptr;
+ this.startOff = startOff;
+ this.orderBase = orderBase;
+ this.orderMultiplier = orderMultiplier;
+ this.fieldOffLen = fieldOffLen;
+ }
+
+ /**
+ * Locate the field.
+ *
+ * @param order Field order.
+ */
+ public void findField(int order) {
+ curFieldOrder = order;
+
+ if (order == BinarySchema.ORDER_NOT_FOUND)
+ curFieldPos = POS_NOT_FOUND;
+ else {
+ int pos = orderBase + order * orderMultiplier;
+
+ if (fieldOffLen == BinaryUtils.OFFSET_1) {
+ byte val = offheap() ? BinaryPrimitives.readByte(ptr, pos) : BinaryPrimitives.readByte(arr, pos);
+
+ curFieldPos = startOff + ((int)val & 0xFF);
+ }
+ else if (fieldOffLen == BinaryUtils.OFFSET_2) {
+ short val = offheap() ? BinaryPrimitives.readShort(ptr, pos) : BinaryPrimitives.readShort(arr, pos);
+
+ curFieldPos = startOff + ((int)val & 0xFFFF);
+ }
+ else {
+ int val = offheap() ? BinaryPrimitives.readInt(ptr, pos) : BinaryPrimitives.readInt(arr, pos);
+
+ curFieldPos = startOff + val;
+ }
+ }
+ }
+
+ /**
+ * Get field type.
+ *
+ * @return Field type.
+ */
+ private byte fieldType() {
+ if (curFieldPos == POS_NOT_FOUND)
+ return GridBinaryMarshaller.NULL;
+ else
+ return offheap() ?
+ BinaryPrimitives.readByte(ptr, curFieldPos) : BinaryPrimitives.readByte(arr, curFieldPos);
+ }
+
+ /**
+ * @return Whether this is offheap object.
+ */
+ private boolean offheap() {
+ return ptr != 0L;
+ }
+
+ /**
+ * Get current field.
+ *
+ * @return Current field.
+ */
+ private Object currentField() {
+ return obj.fieldByOrder(curFieldOrder);
+ }
+
+ /**
+ * Read byte value.
+ *
+ * @param off Offset.
+ * @return Value.
+ */
+ private byte readByte(int off) {
+ if (offheap())
+ return BinaryPrimitives.readByte(ptr, curFieldPos + off);
+ else
+ return arr[curFieldPos + off];
+ }
+
+ /**
+ * Read short value.
+ *
+ * @param off Offset.
+ * @return Value.
+ */
+ private short readShort(int off) {
+ if (offheap())
+ return BinaryPrimitives.readShort(ptr, curFieldPos + off);
+ else
+ return BinaryPrimitives.readShort(arr, curFieldPos + off);
+ }
+
+ /**
+ * Read int value.
+ *
+ * @param off Offset.
+ * @return Value.
+ */
+ private int readInt(int off) {
+ if (offheap())
+ return BinaryPrimitives.readInt(ptr, curFieldPos + off);
+ else
+ return BinaryPrimitives.readInt(arr, curFieldPos + off);
+ }
+
+ /**
+ * Read long value.
+ *
+ * @param off Offset.
+ * @return Value.
+ */
+ private long readLong(int off) {
+ if (offheap())
+ return BinaryPrimitives.readLong(ptr, curFieldPos + off);
+ else
+ return BinaryPrimitives.readLong(arr, curFieldPos + off);
+ }
+
+ /**
+ * Compare fields.
+ *
+ * @param c1 First comparer.
+ * @param c2 Second comparer.
+ * @return {@code True} if both fields are equal.
+ */
+ public static boolean equals(BinarySerializedFieldComparator c1, BinarySerializedFieldComparator c2) {
+ // Compare field types.
+ byte typ = c1.fieldType();
+
+ if (typ != c2.fieldType())
+ return false;
+
+ // Switch by type and compare.
+ switch (typ) {
+ case GridBinaryMarshaller.BYTE:
+ case GridBinaryMarshaller.BOOLEAN:
+ return c1.readByte(1) == c2.readByte(1);
+
+ case GridBinaryMarshaller.SHORT:
+ case GridBinaryMarshaller.CHAR:
+ return c1.readShort(1) == c2.readShort(1);
+
+ case GridBinaryMarshaller.INT:
+ case GridBinaryMarshaller.FLOAT:
+ return c1.readInt(1) == c2.readInt(1);
+
+ case GridBinaryMarshaller.LONG:
+ case GridBinaryMarshaller.DOUBLE:
+ case GridBinaryMarshaller.DATE:
+ return c1.readLong(1) == c2.readLong(1);
+
+ case GridBinaryMarshaller.TIMESTAMP:
+ return c1.readLong(1) == c2.readLong(1) && c1.readInt(1 + 8) == c2.readInt(1 + 8);
+
+ case GridBinaryMarshaller.UUID:
+ return c1.readLong(1) == c2.readLong(1) && c1.readLong(1 + 8) == c2.readLong(1 + 8);
+
+ case GridBinaryMarshaller.STRING:
+ return compareByteArrays(c1, c2, 1);
+
+ case GridBinaryMarshaller.DECIMAL:
+ return c1.readInt(1) == c2.readInt(1) && compareByteArrays(c1, c2, 5);
+
+ case GridBinaryMarshaller.NULL:
+ return true;
+
+ default:
+ Object val1 = c1.currentField();
+ Object val2 = c2.currentField();
+
+ return isArray(val1) ? compareArrays(val1, val2) : F.eq(val1, val2);
+ }
+ }
+
+ /**
+ * Compare arrays.
+ *
+ * @param val1 Value 1.
+ * @param val2 Value 2.
+ * @return Result.
+ */
+ private static boolean compareArrays(Object val1, Object val2) {
+ if (val1.getClass() == val2.getClass()) {
+ if (val1 instanceof byte[])
+ return Arrays.equals((byte[])val1, (byte[])val2);
+ else if (val1 instanceof boolean[])
+ return Arrays.equals((boolean[])val1, (boolean[])val2);
+ else if (val1 instanceof short[])
+ return Arrays.equals((short[])val1, (short[])val2);
+ else if (val1 instanceof char[])
+ return Arrays.equals((char[])val1, (char[])val2);
+ else if (val1 instanceof int[])
+ return Arrays.equals((int[])val1, (int[])val2);
+ else if (val1 instanceof long[])
+ return Arrays.equals((long[])val1, (long[])val2);
+ else if (val1 instanceof float[])
+ return Arrays.equals((float[])val1, (float[])val2);
+ else if (val1 instanceof double[])
+ return Arrays.equals((double[])val1, (double[])val2);
+ else
+ return Arrays.deepEquals((Object[])val1, (Object[])val2);
+ }
+
+ return false;
+ }
+
+ /**
+ * @param field Field.
+ * @return {@code True} if field is array.
+ */
+ private static boolean isArray(@Nullable Object field) {
+ return field != null && field.getClass().isArray();
+ }
+
+ /**
+ * Compare byte arrays.
+ *
+ * @param c1 Comparer 1.
+ * @param c2 Comparer 2.
+ * @param off Offset (where length is located).
+ * @return {@code True} if equal.
+ */
+ private static boolean compareByteArrays(BinarySerializedFieldComparator c1, BinarySerializedFieldComparator c2,
+ int off) {
+ int len = c1.readInt(off);
+
+ if (len != c2.readInt(off))
+ return false;
+ else {
+ off += 4;
+
+ if (c1.offheap()) {
+ if (c2.offheap())
+ // Case 1: both offheap.
+ return GridUnsafeMemory.compare(c1.curFieldPos + c1.ptr + off, c2.curFieldPos + c2.ptr + off, len);
+ }
+ else {
+ if (!c2.offheap()) {
+ // Case 2: both onheap.
+ for (int i = 0; i < len; i++) {
+ if (c1.arr[c1.curFieldPos + off + i] != c2.arr[c2.curFieldPos + off + i])
+ return false;
+ }
+
+ return true;
+ }
+ else {
+ // Swap.
+ BinarySerializedFieldComparator tmp = c1;
+ c1 = c2;
+ c2 = tmp;
+ }
+ }
+
+ // Case 3: offheap vs onheap.
+ assert c1.offheap() && !c2.offheap();
+
+ for (int i = 0; i < len; i++) {
+ if (BinaryPrimitives.readByte(c1.ptr, c1.curFieldPos + off + i) != c2.arr[c2.curFieldPos + off + i])
+ return false;
+ }
+
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
index b304082..cb6e641 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
@@ -312,7 +312,7 @@ public class BinaryUtils {
* @param flag Flag.
* @return {@code True} if flag is set in flags.
*/
- static boolean isFlagSet(short flags, short flag) {
+ public static boolean isFlagSet(short flags, short flag) {
return (flags & flag) == flag;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
index 1de0a65..adaacdd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
@@ -29,6 +29,7 @@ import java.util.Date;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.binary.BinaryIdentityResolver;
import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.binary.BinaryRawWriter;
import org.apache.ignite.binary.BinaryWriter;
@@ -326,6 +327,48 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
}
/**
+ * Perform post-write hash code update if necessary.
+ *
+ * @param clsName Class name. Always null if class is registered.
+ */
+ public void postWriteHashCode(@Nullable String clsName) {
+ int typeId = clsName == null ? this.typeId : ctx.typeId(clsName);
+
+ BinaryIdentityResolver identity = ctx.identity(typeId);
+
+ if (identity != null) {
+ if (out.hasArray()) {
+ // Heap.
+ byte[] data = out.array();
+
+ BinaryObjectImpl obj = new BinaryObjectImpl(ctx, data, start);
+
+ short flags = BinaryPrimitives.readShort(data, start + GridBinaryMarshaller.FLAGS_POS);
+
+ BinaryPrimitives.writeShort(data, start + GridBinaryMarshaller.FLAGS_POS,
+ (short) (flags & ~BinaryUtils.FLAG_EMPTY_HASH_CODE));
+
+ BinaryPrimitives.writeInt(data, start + GridBinaryMarshaller.HASH_CODE_POS, identity.hashCode(obj));
+ }
+ else {
+ // Offheap.
+ long ptr = out.rawOffheapPointer();
+
+ assert ptr != 0;
+
+ BinaryObjectOffheapImpl obj = new BinaryObjectOffheapImpl(ctx, ptr, start, out.capacity());
+
+ short flags = BinaryPrimitives.readShort(ptr, start + GridBinaryMarshaller.FLAGS_POS);
+
+ BinaryPrimitives.writeShort(ptr, start + GridBinaryMarshaller.FLAGS_POS,
+ (short) (flags & ~BinaryUtils.FLAG_EMPTY_HASH_CODE));
+
+ BinaryPrimitives.writeInt(ptr, start + GridBinaryMarshaller.HASH_CODE_POS, identity.hashCode(obj));
+ }
+ }
+ }
+
+ /**
* Pop schema.
*/
public void popSchema() {
@@ -337,8 +380,6 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
* @param val Byte array.
*/
public void write(byte[] val) {
- assert val != null;
-
out.writeByteArray(val);
}
@@ -348,8 +389,6 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
* @param len Length.
*/
public void write(byte[] val, int off, int len) {
- assert val != null;
-
out.write(val, off, len);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java
index f0bc874..ddd2423 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java
@@ -130,6 +130,8 @@ public class BinaryObjectBuilderImpl implements BinaryObjectBuilder {
* @param start Start.
*/
BinaryObjectBuilderImpl(BinaryBuilderReader reader, int start) {
+ assert reader != null;
+
this.reader = reader;
this.start = start;
this.flags = reader.readShortPositioned(start + GridBinaryMarshaller.FLAGS_POS);
@@ -193,6 +195,7 @@ public class BinaryObjectBuilderImpl implements BinaryObjectBuilder {
* @param writer Writer.
* @param serializer Serializer.
*/
+ @SuppressWarnings("ResultOfMethodCallIgnored")
void serializeTo(BinaryWriterExImpl writer, BinaryBuilderSerializer serializer) {
try {
writer.preWrite(registeredType ? null : clsNameToWrite);
@@ -357,6 +360,9 @@ public class BinaryObjectBuilderImpl implements BinaryObjectBuilder {
schemaReg.addSchema(curSchema.schemaId(), curSchema);
}
+
+ // Update hash code after schema is written.
+ writer.postWriteHashCode(registeredType ? null : clsNameToWrite);
}
finally {
writer.popSchema();
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractInputStream.java
index b6c30bb..46aa03d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractInputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractInputStream.java
@@ -289,6 +289,11 @@ public abstract class BinaryAbstractInputStream extends BinaryAbstractStream
return 0;
}
+ /** {@inheritDoc} */
+ @Override public long rawOffheapPointer() {
+ return 0;
+ }
+
/**
* Ensure that there is enough data.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractOutputStream.java
index b9df68e..769031f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractOutputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractOutputStream.java
@@ -244,6 +244,11 @@ public abstract class BinaryAbstractOutputStream extends BinaryAbstractStream
}
/** {@inheritDoc} */
+ @Override public long rawOffheapPointer() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
@Override public void unsafeEnsure(int cap) {
ensureCapacity(pos + cap);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapInputStream.java
index b584373..b5edc02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapInputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapInputStream.java
@@ -92,6 +92,11 @@ public final class BinaryHeapInputStream extends BinaryAbstractInputStream {
}
/** {@inheritDoc} */
+ @Override public int capacity() {
+ return data.length;
+ }
+
+ /** {@inheritDoc} */
@Override public byte[] array() {
return data;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java
index 2c31641..f06c980 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java
@@ -209,4 +209,9 @@ public final class BinaryHeapOutputStream extends BinaryAbstractOutputStream {
shift(8);
}
+
+ /** {@inheritDoc} */
+ @Override public int capacity() {
+ return data.length;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapInputStream.java
index 9230846..9dc92c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapInputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapInputStream.java
@@ -66,6 +66,11 @@ public class BinaryOffheapInputStream extends BinaryAbstractInputStream {
}
/** {@inheritDoc} */
+ @Override public int capacity() {
+ return cap;
+ }
+
+ /** {@inheritDoc} */
@Override public byte[] array() {
return arrayCopy();
}
@@ -147,4 +152,9 @@ public class BinaryOffheapInputStream extends BinaryAbstractInputStream {
@Override public long offheapPointer() {
return forceHeap ? 0 : ptr;
}
+
+ /** {@inheritDoc} */
+ @Override public long rawOffheapPointer() {
+ return ptr;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapOutputStream.java
index 1cb9f4f..be9f7d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapOutputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapOutputStream.java
@@ -89,9 +89,7 @@ public class BinaryOffheapOutputStream extends BinaryAbstractOutputStream {
return ptr;
}
- /**
- * @return Capacity.
- */
+ /** {@inheritDoc} */
public int capacity() {
return cap;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryStream.java
index b868199..5bdd644 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryStream.java
@@ -42,12 +42,22 @@ public interface BinaryStream {
public byte[] arrayCopy();
/**
- * @return Offheap pointer if stream is offheap based, otherwise {@code 0}.
+ * @return Offheap pointer if stream is offheap based and "forceHeap" flag is not set; otherwise {@code 0}.
*/
public long offheapPointer();
/**
+ * @return Offheap pointer if stream is offheap based; otherwise {@code 0}.
+ */
+ public long rawOffheapPointer();
+
+ /**
* @return {@code True} is stream is array based.
*/
public boolean hasArray();
+
+ /**
+ * @return Total capacity.
+ */
+ public int capacity();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
index 6f0d9c4..5c4a147 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
@@ -55,6 +55,7 @@ import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.compute.ComputeTaskTimeoutException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
@@ -697,6 +698,13 @@ public class JdbcConnection implements Connection {
}
/**
+ * @return {@code true} if target node has DML support, {@code false} otherwise.
+ */
+ boolean isDmlSupported() {
+ return ignite.version().greaterThanEqual(1, 8, 0);
+ }
+
+ /**
* @return Local query flag.
*/
boolean isLocalQuery() {
@@ -736,6 +744,15 @@ public class JdbcConnection implements Connection {
}
/**
+ * @param sql Query.
+ * @return {@link PreparedStatement} from underlying engine to supply metadata to Prepared - most likely H2.
+ */
+ PreparedStatement prepareNativeStatement(String sql) throws SQLException {
+ return ((IgniteCacheProxy) ignite().cache(cacheName())).context()
+ .kernalContext().query().prepareNativeStatement(cacheName(), sql);
+ }
+
+ /**
* JDBC connection validation task.
*/
private static class JdbcConnectionValidationTask implements IgniteCallable<Boolean> {
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java
index a99f24c..57badd2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java
@@ -31,8 +31,10 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
/** SQL query. */
private final String sql;
- /** Arguments count. */
- private final int argsCnt;
+ /**
+ * H2's parsed statement to retrieve metadata from.
+ */
+ private PreparedStatement nativeStatement;
/**
* Creates new prepared statement.
@@ -44,12 +46,21 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
super(conn);
this.sql = sql;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addBatch(String sql) throws SQLException {
+ ensureNotClosed();
- argsCnt = sql.replaceAll("[^?]", "").length();
+ throw new SQLFeatureNotSupportedException("Adding new SQL command to batch not supported for prepared statement.");
}
+
+
/** {@inheritDoc} */
@Override public ResultSet executeQuery() throws SQLException {
+ ensureNotClosed();
+
ResultSet rs = executeQuery(sql);
args = null;
@@ -61,7 +72,11 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
@Override public int executeUpdate() throws SQLException {
ensureNotClosed();
- throw new SQLFeatureNotSupportedException("Updates are not supported.");
+ int res = executeUpdate(sql);
+
+ args = null;
+
+ return res;
}
/** {@inheritDoc} */
@@ -163,6 +178,13 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
}
/** {@inheritDoc} */
+ @Override public void clearBatch() throws SQLException {
+ ensureNotClosed();
+
+ throw new SQLFeatureNotSupportedException("Batch statements are not supported yet.");
+ }
+
+ /** {@inheritDoc} */
@Override public void setObject(int paramIdx, Object x, int targetSqlType) throws SQLException {
setArgument(paramIdx, x);
}
@@ -181,7 +203,12 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
@Override public void addBatch() throws SQLException {
ensureNotClosed();
- throw new SQLFeatureNotSupportedException("Updates are not supported.");
+ throw new SQLFeatureNotSupportedException("Batch statements are not supported yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public int[] executeBatch() throws SQLException {
+ throw new SQLFeatureNotSupportedException("Batch statements are not supported yet.");
}
/** {@inheritDoc} */
@@ -223,7 +250,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
@Override public ResultSetMetaData getMetaData() throws SQLException {
ensureNotClosed();
- throw new SQLFeatureNotSupportedException("Meta data for prepared statement is not supported.");
+ return getNativeStatement().getMetaData();
}
/** {@inheritDoc} */
@@ -255,7 +282,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
@Override public ParameterMetaData getParameterMetaData() throws SQLException {
ensureNotClosed();
- throw new SQLFeatureNotSupportedException("Meta data for prepared statement is not supported.");
+ return getNativeStatement().getParameterMetaData();
}
/** {@inheritDoc} */
@@ -400,12 +427,36 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
private void setArgument(int paramIdx, Object val) throws SQLException {
ensureNotClosed();
- if (paramIdx < 1 || paramIdx > argsCnt)
+ if (paramIdx < 1)
throw new SQLException("Parameter index is invalid: " + paramIdx);
+ ensureArgsSize(paramIdx);
+
+ args.set(paramIdx - 1, val);
+ }
+
+ /**
+ * Initialize {@link #args} and increase its capacity and size up to given argument if needed.
+ * @param size new expected size.
+ */
+ private void ensureArgsSize(int size) {
if (args == null)
- args = new Object[argsCnt];
+ args = new ArrayList<>(size);
+
+ args.ensureCapacity(size);
+
+ while (args.size() < size)
+ args.add(null);
+ }
+
+ /**
+ * @return H2's prepared statement to get metadata from.
+ * @throws SQLException if failed.
+ */
+ private PreparedStatement getNativeStatement() throws SQLException {
+ if (nativeStatement != null)
+ return nativeStatement;
- args[paramIdx - 1] = val;
+ return (nativeStatement = conn.prepareNativeStatement(sql));
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java
index c4911cb..0b23f9b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java
@@ -49,7 +49,10 @@ import org.apache.ignite.resources.IgniteInstanceResource;
* Not closed cursors will be removed after {@link #RMV_DELAY} milliseconds.
* This parameter can be configured via {@link IgniteSystemProperties#IGNITE_JDBC_DRIVER_CURSOR_REMOVE_DELAY}
* system property.
+ *
+ * Deprecated due to introduction of DML features - see {@link JdbcQueryTaskV2}.
*/
+@Deprecated
class JdbcQueryTask implements IgniteCallable<JdbcQueryTask.QueryResult> {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
@@ -178,7 +181,7 @@ class JdbcQueryTask implements IgniteCallable<JdbcQueryTask.QueryResult> {
List<Object> row0 = new ArrayList<>(row.size());
for (Object val : row)
- row0.add(JdbcUtils.sqlType(val) ? val : val.toString());
+ row0.add(val == null || JdbcUtils.isSqlType(val.getClass()) ? val : val.toString());
rows.add(row0);
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTaskV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTaskV2.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTaskV2.java
new file mode 100644
index 0000000..9093d15
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTaskV2.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import java.io.Serializable;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteJdbcDriver;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.apache.ignite.internal.util.typedef.CAX;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.resources.IgniteInstanceResource;
+
+/**
+ * Task for SQL queries execution through {@link IgniteJdbcDriver}.
+ * <p>
+ * Not closed cursors will be removed after {@link #RMV_DELAY} milliseconds.
+ * This parameter can be configured via {@link IgniteSystemProperties#IGNITE_JDBC_DRIVER_CURSOR_REMOVE_DELAY}
+ * system property.
+ */
+class JdbcQueryTaskV2 implements IgniteCallable<JdbcQueryTaskV2.QueryResult> {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** How long to store open cursor. */
+ private static final long RMV_DELAY = IgniteSystemProperties.getLong(
+ IgniteSystemProperties.IGNITE_JDBC_DRIVER_CURSOR_REMOVE_DELAY, 600000);
+
+ /** Scheduler. */
+ private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1);
+
+ /** Open cursors. */
+ private static final ConcurrentMap<UUID, Cursor> CURSORS = new ConcurrentHashMap<>();
+
+ /** Ignite. */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** Uuid. */
+ private final UUID uuid;
+
+ /** Cache name. */
+ private final String cacheName;
+
+ /** Sql. */
+ private final String sql;
+
+ /** Operation type flag - query or not. */
+ private Boolean isQry;
+
+ /** Args. */
+ private final Object[] args;
+
+ /** Fetch size. */
+ private final int fetchSize;
+
+ /** Local execution flag. */
+ private final boolean loc;
+
+ /** Local query flag. */
+ private final boolean locQry;
+
+ /** Collocated query flag. */
+ private final boolean collocatedQry;
+
+ /** Distributed joins flag. */
+ private final boolean distributedJoins;
+
+ /**
+ * @param ignite Ignite.
+ * @param cacheName Cache name.
+ * @param sql Sql query.
+ * @param isQry Operation type flag - query or not - to enforce query type check.
+ * @param loc Local execution flag.
+ * @param args Args.
+ * @param fetchSize Fetch size.
+ * @param uuid UUID.
+ * @param locQry Local query flag.
+ * @param collocatedQry Collocated query flag.
+ * @param distributedJoins Distributed joins flag.
+ */
+ public JdbcQueryTaskV2(Ignite ignite, String cacheName, String sql,
+ Boolean isQry, boolean loc, Object[] args, int fetchSize, UUID uuid,
+ boolean locQry, boolean collocatedQry, boolean distributedJoins) {
+ this.ignite = ignite;
+ this.args = args;
+ this.uuid = uuid;
+ this.cacheName = cacheName;
+ this.sql = sql;
+ this.isQry = isQry;
+ this.fetchSize = fetchSize;
+ this.loc = loc;
+ this.locQry = locQry;
+ this.collocatedQry = collocatedQry;
+ this.distributedJoins = distributedJoins;
+ }
+
+ /** {@inheritDoc} */
+ @Override public JdbcQueryTaskV2.QueryResult call() throws Exception {
+ Cursor cursor = CURSORS.get(uuid);
+
+ List<String> tbls = null;
+ List<String> cols = null;
+ List<String> types = null;
+
+ boolean first;
+
+ if (first = (cursor == null)) {
+ IgniteCache<?, ?> cache = ignite.cache(cacheName);
+
+ // Don't create caches on server nodes in order to avoid of data rebalancing.
+ boolean start = ignite.configuration().isClientMode();
+
+ if (cache == null && cacheName == null)
+ cache = ((IgniteKernal)ignite).context().cache().getOrStartPublicCache(start, !loc && locQry);
+
+ if (cache == null) {
+ if (cacheName == null)
+ throw new SQLException("Failed to execute query. No suitable caches found.");
+ else
+ throw new SQLException("Cache not found [cacheName=" + cacheName + ']');
+ }
+
+ SqlFieldsQuery qry = (isQry != null ? new JdbcSqlFieldsQuery(sql, isQry) : new SqlFieldsQuery(sql))
+ .setArgs(args);
+
+ qry.setPageSize(fetchSize);
+ qry.setLocal(locQry);
+ qry.setCollocated(collocatedQry);
+ qry.setDistributedJoins(distributedJoins);
+
+ QueryCursorImpl<List<?>> qryCursor = (QueryCursorImpl<List<?>>)cache.query(qry);
+
+ if (isQry == null)
+ isQry = qryCursor.isQuery();
+
+ Collection<GridQueryFieldMetadata> meta = qryCursor.fieldsMeta();
+
+ tbls = new ArrayList<>(meta.size());
+ cols = new ArrayList<>(meta.size());
+ types = new ArrayList<>(meta.size());
+
+ for (GridQueryFieldMetadata desc : meta) {
+ tbls.add(desc.typeName());
+ cols.add(desc.fieldName().toUpperCase());
+ types.add(desc.fieldTypeName());
+ }
+
+ CURSORS.put(uuid, cursor = new Cursor(qryCursor, qryCursor.iterator()));
+ }
+
+ List<List<?>> rows = new ArrayList<>();
+
+ for (List<?> row : cursor) {
+ List<Object> row0 = new ArrayList<>(row.size());
+
+ for (Object val : row)
+ row0.add(val == null || JdbcUtils.isSqlType(val.getClass()) ? val : val.toString());
+
+ rows.add(row0);
+
+ if (rows.size() == fetchSize) // If fetchSize is 0 then unlimited
+ break;
+ }
+
+ boolean finished = !cursor.hasNext();
+
+ if (finished)
+ remove(uuid, cursor);
+ else if (first) {
+ if (!loc)
+ scheduleRemoval(uuid, RMV_DELAY);
+ }
+ else if (!loc && !CURSORS.replace(uuid, cursor, new Cursor(cursor.cursor, cursor.iter)))
+ assert !CURSORS.containsKey(uuid) : "Concurrent cursor modification.";
+
+ assert isQry != null : "Query flag must be set prior to returning result";
+
+ return new QueryResult(uuid, finished, isQry, rows, cols, tbls, types);
+ }
+
+ /**
+ * Schedules removal of stored cursor in case of remote query execution.
+ *
+ * @param uuid Cursor UUID.
+ * @param delay Delay in milliseconds.
+ */
+ private void scheduleRemoval(final UUID uuid, long delay) {
+ assert !loc;
+
+ SCHEDULER.schedule(new CAX() {
+ @Override public void applyx() {
+ while (true) {
+ Cursor c = CURSORS.get(uuid);
+
+ if (c == null)
+ break;
+
+ // If the cursor was accessed since last scheduling then reschedule.
+ long untouchedTime = U.currentTimeMillis() - c.lastAccessTime;
+
+ if (untouchedTime < RMV_DELAY) {
+ scheduleRemoval(uuid, RMV_DELAY - untouchedTime);
+
+ break;
+ }
+ else if (remove(uuid, c))
+ break;
+ }
+ }
+ }, delay, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * @param uuid Cursor UUID.
+ * @param c Cursor.
+ * @return {@code true} If succeeded.
+ */
+ private static boolean remove(UUID uuid, Cursor c) {
+ boolean rmv = CURSORS.remove(uuid, c);
+
+ if (rmv)
+ c.cursor.close();
+
+ return rmv;
+ }
+
+ /**
+ * Closes and removes cursor.
+ *
+ * @param uuid Cursor UUID.
+ */
+ static void remove(UUID uuid) {
+ Cursor c = CURSORS.remove(uuid);
+
+ if (c != null)
+ c.cursor.close();
+ }
+
+
+ /**
+ * Result of query execution.
+ */
+ static class QueryResult implements Serializable {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Uuid. */
+ private final UUID uuid;
+
+ /** Finished. */
+ private final boolean finished;
+
+ /** Result type - query or update. */
+ private final boolean isQry;
+
+ /** Rows. */
+ private final List<List<?>> rows;
+
+ /** Tables. */
+ private final List<String> tbls;
+
+ /** Columns. */
+ private final List<String> cols;
+
+ /** Types. */
+ private final List<String> types;
+
+ /**
+ * @param uuid UUID..
+ * @param finished Finished.
+ * @param isQry
+ * @param rows Rows.
+ * @param cols Columns.
+ * @param tbls Tables.
+ * @param types Types.
+ */
+ public QueryResult(UUID uuid, boolean finished, boolean isQry, List<List<?>> rows, List<String> cols,
+ List<String> tbls, List<String> types) {
+ this.isQry = isQry;
+ this.cols = cols;
+ this.uuid = uuid;
+ this.finished = finished;
+ this.rows = rows;
+ this.tbls = tbls;
+ this.types = types;
+ }
+
+ /**
+ * @return Query result rows.
+ */
+ public List<List<?>> getRows() {
+ return rows;
+ }
+
+ /**
+ * @return Tables metadata.
+ */
+ public List<String> getTbls() {
+ return tbls;
+ }
+
+ /**
+ * @return Columns metadata.
+ */
+ public List<String> getCols() {
+ return cols;
+ }
+
+ /**
+ * @return Types metadata.
+ */
+ public List<String> getTypes() {
+ return types;
+ }
+
+ /**
+ * @return Query UUID.
+ */
+ public UUID getUuid() {
+ return uuid;
+ }
+
+ /**
+ * @return {@code True} if it is finished query.
+ */
+ public boolean isFinished() {
+ return finished;
+ }
+
+ /**
+ * @return {@code true} if it is result of a query operation, not update; {@code false} otherwise.
+ */
+ public boolean isQuery() {
+ return isQry;
+ }
+ }
+
+ /**
+ * Cursor.
+ */
+ private static final class Cursor implements Iterable<List<?>> {
+ /** Cursor. */
+ final QueryCursor<List<?>> cursor;
+
+ /** Iterator. */
+ final Iterator<List<?>> iter;
+
+ /** Last access time. */
+ final long lastAccessTime;
+
+ /**
+ * @param cursor Cursor.
+ * @param iter Iterator.
+ */
+ private Cursor(QueryCursor<List<?>> cursor, Iterator<List<?>> iter) {
+ this.cursor = cursor;
+ this.iter = iter;
+ this.lastAccessTime = U.currentTimeMillis();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<List<?>> iterator() {
+ return iter;
+ }
+
+ /**
+ * @return {@code True} if cursor has next element.
+ */
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
index 1bf5223..c1a5f4c 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
@@ -45,6 +45,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.jetbrains.annotations.Nullable;
/**
@@ -146,6 +147,29 @@ public class JdbcResultSet implements ResultSet {
boolean loc = nodeId == null;
+ if (conn.isDmlSupported()) {
+ // Connections from new clients send queries with new tasks, so we have to continue in the same manner
+ JdbcQueryTaskV2 qryTask = new JdbcQueryTaskV2(loc ? ignite : null, conn.cacheName(), null, true, loc, null,
+ fetchSize, uuid, conn.isLocalQuery(), conn.isCollocatedQuery(), conn.isDistributedJoins());
+
+ try {
+ JdbcQueryTaskV2.QueryResult res =
+ loc ? qryTask.call() : ignite.compute(ignite.cluster().forNodeId(nodeId)).call(qryTask);
+
+ finished = res.isFinished();
+
+ it = res.getRows().iterator();
+
+ return next();
+ }
+ catch (IgniteSQLException e) {
+ throw e.toJdbcException();
+ }
+ catch (Exception e) {
+ throw new SQLException("Failed to query Ignite.", e);
+ }
+ }
+
JdbcQueryTask qryTask = new JdbcQueryTask(loc ? ignite : null, conn.cacheName(), null, loc, null,
fetchSize, uuid, conn.isLocalQuery(), conn.isCollocatedQuery(), conn.isDistributedJoins());
@@ -159,6 +183,9 @@ public class JdbcResultSet implements ResultSet {
return next();
}
+ catch (IgniteSQLException e) {
+ throw e.toJdbcException();
+ }
catch (Exception e) {
throw new SQLException("Failed to query Ignite.", e);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcSqlFieldsQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcSqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcSqlFieldsQuery.java
new file mode 100644
index 0000000..1b27296
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcSqlFieldsQuery.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link SqlFieldsQuery} with JDBC flavor - it has additional flag indicating whether JDBC driver expects
+ * this query to return a result set or an update counter. This class is not intended for use outside JDBC driver.
+ */
+public final class JdbcSqlFieldsQuery extends SqlFieldsQuery {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Flag set by JDBC driver to enforce checks for correct operation type. */
+ private final boolean isQry;
+
+ /**
+ * @param sql SQL query.
+ * @param isQry Flag indicating whether this object denotes a query or an update operation.
+ */
+ JdbcSqlFieldsQuery(String sql, boolean isQry) {
+ super(sql);
+ this.isQry = isQry;
+ }
+
+ /**
+ * @return Flag indicating whether this object denotes a query or an update operation..
+ */
+ public boolean isQuery() {
+ return isQry;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
index e187dc0..dbb2390 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
@@ -17,19 +17,25 @@
package org.apache.ignite.internal.jdbc2;
+import java.sql.BatchUpdateException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLWarning;
import java.sql.Statement;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
import static java.sql.ResultSet.CONCUR_READ_ONLY;
import static java.sql.ResultSet.FETCH_FORWARD;
@@ -44,7 +50,7 @@ public class JdbcStatement implements Statement {
private static final int DFLT_FETCH_SIZE = 1024;
/** Connection. */
- private final JdbcConnection conn;
+ protected final JdbcConnection conn;
/** Closed flag. */
private boolean closed;
@@ -53,10 +59,10 @@ public class JdbcStatement implements Statement {
private int maxRows;
/** Current result set. */
- private ResultSet rs;
+ protected ResultSet rs;
/** Query arguments. */
- protected Object[] args;
+ protected ArrayList<Object> args;
/** Fetch size. */
private int fetchSize = DFLT_FETCH_SIZE;
@@ -67,6 +73,12 @@ public class JdbcStatement implements Statement {
/** Fields indexes. */
Map<String, Integer> fieldsIdxs = new HashMap<>();
+ /** Current updated items count. */
+ long updateCnt = -1;
+
+ /** Batch statements. */
+ private List<String> batch;
+
/**
* Creates new statement.
*
@@ -79,12 +91,15 @@ public class JdbcStatement implements Statement {
}
/** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
@Override public ResultSet executeQuery(String sql) throws SQLException {
ensureNotClosed();
rs = null;
- if (sql == null || sql.isEmpty())
+ updateCnt = -1;
+
+ if (F.isEmpty(sql))
throw new SQLException("SQL query is empty");
Ignite ignite = conn.ignite();
@@ -95,8 +110,8 @@ public class JdbcStatement implements Statement {
boolean loc = nodeId == null;
- JdbcQueryTask qryTask = new JdbcQueryTask(loc ? ignite : null, conn.cacheName(),
- sql, loc, args, fetchSize, uuid, conn.isLocalQuery(), conn.isCollocatedQuery(), conn.isDistributedJoins());
+ JdbcQueryTask qryTask = new JdbcQueryTask(loc ? ignite : null, conn.cacheName(), sql, loc, getArgs(),
+ fetchSize, uuid, conn.isLocalQuery(), conn.isCollocatedQuery(), conn.isDistributedJoins());
try {
JdbcQueryTask.QueryResult res =
@@ -111,6 +126,9 @@ public class JdbcStatement implements Statement {
return rs;
}
+ catch (IgniteSQLException e) {
+ throw e.toJdbcException();
+ }
catch (Exception e) {
throw new SQLException("Failed to query Ignite.", e);
}
@@ -120,7 +138,82 @@ public class JdbcStatement implements Statement {
@Override public int executeUpdate(String sql) throws SQLException {
ensureNotClosed();
- throw new SQLFeatureNotSupportedException("Updates are not supported.");
+ rs = null;
+
+ updateCnt = -1;
+
+ return doUpdate(sql, getArgs());
+ }
+
+ /**
+ * Run update query.
+ * @param sql SQL query.
+ * @param args Update arguments.
+ * @return Number of affected items.
+ * @throws SQLException
+ */
+ int doUpdate(String sql, Object[] args) throws SQLException {
+ if (F.isEmpty(sql))
+ throw new SQLException("SQL query is empty");
+
+ Ignite ignite = conn.ignite();
+
+ UUID nodeId = conn.nodeId();
+
+ UUID uuid = UUID.randomUUID();
+
+ boolean loc = nodeId == null;
+
+ if (!conn.isDmlSupported())
+ throw new SQLException("Failed to query Ignite: DML operations are supported in versions 1.8.0 and newer");
+
+ JdbcQueryTaskV2 qryTask = new JdbcQueryTaskV2(loc ? ignite : null, conn.cacheName(), sql, false, loc, args,
+ fetchSize, uuid, conn.isLocalQuery(), conn.isCollocatedQuery(), conn.isDistributedJoins());
+
+ try {
+ JdbcQueryTaskV2.QueryResult qryRes =
+ loc ? qryTask.call() : ignite.compute(ignite.cluster().forNodeId(nodeId)).call(qryTask);
+
+ Long res = updateCounterFromQueryResult(qryRes.getRows());
+
+ updateCnt = res;
+
+ return res.intValue();
+ }
+ catch (IgniteSQLException e) {
+ throw e.toJdbcException();
+ }
+ catch (SQLException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new SQLException("Failed to query Ignite.", e);
+ }
+ }
+
+ /**
+ * @param rows query result.
+ * @return update counter, if found
+ * @throws SQLException if getting an update counter from result proved to be impossible.
+ */
+ private static Long updateCounterFromQueryResult(List<List<?>> rows) throws SQLException {
+ if (F.isEmpty(rows))
+ return 0L;
+
+ if (rows.size() != 1)
+ throw new SQLException("Expected number of rows of 1 for update operation");
+
+ List<?> row = rows.get(0);
+
+ if (row.size() != 1)
+ throw new SQLException("Expected row size of 1 for update operation");
+
+ Object objRes = row.get(0);
+
+ if (!(objRes instanceof Long))
+ throw new SQLException("Unexpected update result type");
+
+ return (Long) objRes;
}
/** {@inheritDoc} */
@@ -220,11 +313,59 @@ public class JdbcStatement implements Statement {
/** {@inheritDoc} */
@Override public boolean execute(String sql) throws SQLException {
+ if (!conn.isDmlSupported()) {
+ // We attempt to run a query without any checks as long as server does not support DML anyway,
+ // so it simply will throw an exception when given a DML statement instead of a query.
+ rs = executeQuery(sql);
+
+ return true;
+ }
+
ensureNotClosed();
- rs = executeQuery(sql);
+ rs = null;
+
+ updateCnt = -1;
+
+ if (F.isEmpty(sql))
+ throw new SQLException("SQL query is empty");
+
+ Ignite ignite = conn.ignite();
+
+ UUID nodeId = conn.nodeId();
- return true;
+ UUID uuid = UUID.randomUUID();
+
+ boolean loc = nodeId == null;
+
+ JdbcQueryTaskV2 qryTask = new JdbcQueryTaskV2(loc ? ignite : null, conn.cacheName(), sql, null, loc, getArgs(),
+ fetchSize, uuid, conn.isLocalQuery(), conn.isCollocatedQuery(), conn.isDistributedJoins());
+
+ try {
+ JdbcQueryTaskV2.QueryResult res =
+ loc ? qryTask.call() : ignite.compute(ignite.cluster().forNodeId(nodeId)).call(qryTask);
+
+ if (res.isQuery()) {
+ JdbcResultSet rs = new JdbcResultSet(uuid, this, res.getTbls(), res.getCols(), res.getTypes(),
+ res.getRows(), res.isFinished());
+
+ rs.setFetchSize(fetchSize);
+
+ resSets.add(rs);
+
+ this.rs = rs;
+ }
+ else
+ updateCnt = updateCounterFromQueryResult(res.getRows());
+
+ return res.isQuery();
+ }
+ catch (IgniteSQLException e) {
+ throw e.toJdbcException();
+ }
+ catch (Exception e) {
+ throw new SQLException("Failed to query Ignite.", e);
+ }
}
/** {@inheritDoc} */
@@ -242,7 +383,11 @@ public class JdbcStatement implements Statement {
@Override public int getUpdateCount() throws SQLException {
ensureNotClosed();
- return -1;
+ long res = updateCnt;
+
+ updateCnt = -1;
+
+ return Long.valueOf(res).intValue();
}
/** {@inheritDoc} */
@@ -302,21 +447,27 @@ public class JdbcStatement implements Statement {
@Override public void addBatch(String sql) throws SQLException {
ensureNotClosed();
- throw new SQLFeatureNotSupportedException("Updates are not supported.");
+ if (F.isEmpty(sql))
+ throw new SQLException("SQL query is empty");
+
+ if (batch == null)
+ batch = new ArrayList<>();
+
+ batch.add(sql);
}
/** {@inheritDoc} */
@Override public void clearBatch() throws SQLException {
ensureNotClosed();
- throw new SQLFeatureNotSupportedException("Updates are not supported.");
+ batch = null;
}
/** {@inheritDoc} */
@Override public int[] executeBatch() throws SQLException {
ensureNotClosed();
- throw new SQLFeatureNotSupportedException("Updates are not supported.");
+ throw new SQLFeatureNotSupportedException("Batch statements are not supported yet.");
}
/** {@inheritDoc} */
@@ -340,28 +491,37 @@ public class JdbcStatement implements Statement {
@Override public ResultSet getGeneratedKeys() throws SQLException {
ensureNotClosed();
- throw new SQLFeatureNotSupportedException("Updates are not supported.");
+ throw new SQLFeatureNotSupportedException("Auto generated keys are not supported.");
}
/** {@inheritDoc} */
@Override public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
ensureNotClosed();
- throw new SQLFeatureNotSupportedException("Updates are not supported.");
+ if (autoGeneratedKeys == RETURN_GENERATED_KEYS)
+ throw new SQLFeatureNotSupportedException("Auto generated keys are not supported.");
+
+ return executeUpdate(sql);
}
/** {@inheritDoc} */
@Override public int executeUpdate(String sql, int[] colIndexes) throws SQLException {
ensureNotClosed();
- throw new SQLFeatureNotSupportedException("Updates are not supported.");
+ if (!F.isEmpty(colIndexes))
+ throw new SQLFeatureNotSupportedException("Auto generated keys are not supported.");
+
+ return executeUpdate(sql);
}
/** {@inheritDoc} */
@Override public int executeUpdate(String sql, String[] colNames) throws SQLException {
ensureNotClosed();
- throw new SQLFeatureNotSupportedException("Updates are not supported.");
+ if (!F.isEmpty(colNames))
+ throw new SQLFeatureNotSupportedException("Auto generated keys are not supported.");
+
+ return executeUpdate(sql);
}
/** {@inheritDoc} */
@@ -369,7 +529,7 @@ public class JdbcStatement implements Statement {
ensureNotClosed();
if (autoGeneratedKeys == RETURN_GENERATED_KEYS)
- throw new SQLFeatureNotSupportedException("Updates are not supported.");
+ throw new SQLFeatureNotSupportedException("Auto generated keys are not supported.");
return execute(sql);
}
@@ -378,8 +538,8 @@ public class JdbcStatement implements Statement {
@Override public boolean execute(String sql, int[] colIndexes) throws SQLException {
ensureNotClosed();
- if (colIndexes != null && colIndexes.length > 0)
- throw new SQLFeatureNotSupportedException("Updates are not supported.");
+ if (!F.isEmpty(colIndexes))
+ throw new SQLFeatureNotSupportedException("Auto generated keys are not supported.");
return execute(sql);
}
@@ -388,8 +548,8 @@ public class JdbcStatement implements Statement {
@Override public boolean execute(String sql, String[] colNames) throws SQLException {
ensureNotClosed();
- if (colNames != null && colNames.length > 0)
- throw new SQLFeatureNotSupportedException("Updates are not supported.");
+ if (!F.isEmpty(colNames))
+ throw new SQLFeatureNotSupportedException("Auto generated keys are not supported.");
return execute(sql);
}
@@ -448,11 +608,18 @@ public class JdbcStatement implements Statement {
}
/**
+ * @return Args for current statement
+ */
+ protected final Object[] getArgs() {
+ return args != null ? args.toArray() : null;
+ }
+
+ /**
* Ensures that statement is not closed.
*
* @throws SQLException If statement is closed.
*/
- protected void ensureNotClosed() throws SQLException {
+ void ensureNotClosed() throws SQLException {
if (closed)
throw new SQLException("Statement is closed.");
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcUtils.java
index b519340..38a838e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcUtils.java
@@ -17,13 +17,14 @@
package org.apache.ignite.internal.jdbc2;
-import java.math.BigDecimal;
import java.net.URL;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.Date;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+
import static java.sql.Types.BIGINT;
import static java.sql.Types.BINARY;
import static java.sql.Types.BOOLEAN;
@@ -132,24 +133,12 @@ public class JdbcUtils {
}
/**
- * Checks whether type of the object is SQL-complaint.
+ * Checks whether a class is SQL-compliant.
*
- * @param obj Object.
- * @return Whether type of the object is SQL-complaint.
+ * @param cls Class.
+ * @return Whether given type is SQL-compliant.
*/
- public static boolean sqlType(Object obj) {
- return obj == null ||
- obj instanceof BigDecimal ||
- obj instanceof Boolean ||
- obj instanceof Byte ||
- obj instanceof byte[] ||
- obj instanceof java.util.Date ||
- obj instanceof Double ||
- obj instanceof Float ||
- obj instanceof Integer ||
- obj instanceof Long ||
- obj instanceof Short ||
- obj instanceof String ||
- obj instanceof URL;
+ static boolean isSqlType(Class<?> cls) {
+ return GridQueryProcessor.isSqlType(cls) || cls == URL.class;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
index 5a46d65..852c432 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
@@ -17,7 +17,12 @@
package org.apache.ignite.internal.processors.cache;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.sql.ResultSet;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -43,7 +48,10 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> {
AtomicReferenceFieldUpdater.newUpdater(QueryCursorImpl.class, State.class, "state");
/** Query executor. */
- private Iterable<T> iterExec;
+ private final Iterable<T> iterExec;
+
+ /** Result type flag - result set or update counter. */
+ private final boolean isQry;
/** */
private Iterator<T> iter;
@@ -62,8 +70,7 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> {
* @param cancel Cancellation closure.
*/
public QueryCursorImpl(Iterable<T> iterExec, GridQueryCancel cancel) {
- this.iterExec = iterExec;
- this.cancel = cancel;
+ this(iterExec, cancel, true);
}
/**
@@ -73,6 +80,16 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> {
this(iterExec, null);
}
+ /**
+ * @param iterExec Query executor.
+ * @param isQry Result type flag - {@code true} for query, {@code false} for update operation.
+ */
+ public QueryCursorImpl(Iterable<T> iterExec, GridQueryCancel cancel, boolean isQry) {
+ this.iterExec = iterExec;
+ this.cancel = cancel;
+ this.isQry = isQry;
+ }
+
/** {@inheritDoc} */
@Override public Iterator<T> iterator() {
if (!STATE_UPDATER.compareAndSet(this, IDLE, EXECUTION))
@@ -154,6 +171,14 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> {
}
/**
+ * @return {@code true} if this cursor corresponds to a {@link ResultSet} as a result of query,
+ * {@code false} if query was modifying operation like INSERT, UPDATE, or DELETE.
+ */
+ public boolean isQuery() {
+ return isQry;
+ }
+
+ /**
* @param fieldsMeta SQL Fields query result metadata.
*/
public void fieldsMeta(List<GridQueryFieldMetadata> fieldsMeta) {