You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/12/05 12:38:09 UTC
[05/52] ignite git commit: IGNITE-2294: Implemented DML.
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/test/java/org/apache/ignite/internal/binary/BinarySerialiedFieldComparatorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinarySerialiedFieldComparatorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinarySerialiedFieldComparatorSelfTest.java
new file mode 100644
index 0000000..c3d1951
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinarySerialiedFieldComparatorSelfTest.java
@@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.binary;
+
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Unit tests for serialized field comparer.
+ */
+public class BinarySerialiedFieldComparatorSelfTest extends GridCommonAbstractTest {
+ /** Type counter. */
+ private static final AtomicInteger TYPE_CTR = new AtomicInteger();
+
+ /** Single field name. */
+ private static final String FIELD_SINGLE = "single";
+
+ /** Pointers to release. */
+ private final Set<Long> ptrs = new ConcurrentHashSet<>();
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrid();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ TYPE_CTR.incrementAndGet();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ for (Long ptr : ptrs)
+ GridUnsafe.freeMemory(ptr);
+
+ super.afterTest();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setMarshaller(new BinaryMarshaller());
+
+ return cfg;
+ }
+
+ /**
+ * Test byte fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testByte() throws Exception {
+ checkTwoValues((byte)1, (byte)2);
+ }
+
+ /**
+ * Test boolean fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testBoolean() throws Exception {
+ checkTwoValues(true, false);
+ }
+
+ /**
+ * Test short fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testShort() throws Exception {
+ checkTwoValues((short)1, (short)2);
+ }
+
+ /**
+ * Test char fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testChar() throws Exception {
+ checkTwoValues('a', 'b');
+ }
+
+ /**
+ * Test int fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testInt() throws Exception {
+ checkTwoValues(1, 2);
+ }
+
+ /**
+ * Test long fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testLong() throws Exception {
+ checkTwoValues(1L, 2L);
+ }
+
+ /**
+ * Test float fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testFloat() throws Exception {
+ checkTwoValues(1.0f, 2.0f);
+ }
+
+ /**
+ * Test double fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testDouble() throws Exception {
+ checkTwoValues(1.0d, 2.0d);
+ }
+
+ /**
+ * Test string fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testString() throws Exception {
+ checkTwoValues("str1", "str2");
+ }
+
+ /**
+ * Test date fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testDate() throws Exception {
+ long time = System.currentTimeMillis();
+
+ checkTwoValues(new Date(time), new Date(time + 100));
+ }
+
+ /**
+ * Test date fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testTimestamp() throws Exception {
+ long time = System.currentTimeMillis();
+
+ checkTwoValues(new Timestamp(time), new Timestamp(time + 100));
+ }
+
+ /**
+ * Test UUID fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testUuid() throws Exception {
+ checkTwoValues(UUID.randomUUID(), UUID.randomUUID());
+ }
+
+ /**
+ * Test decimal fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testDecimal() throws Exception {
+ checkTwoValues(new BigDecimal("12.3E+7"), new BigDecimal("12.4E+7"));
+ checkTwoValues(new BigDecimal("12.3E+7"), new BigDecimal("12.3E+8"));
+ }
+
+ /**
+ * Test object fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testInnerObject() throws Exception {
+ checkTwoValues(new InnerClass(1), new InnerClass(2));
+ }
+
+ /**
+ * Test byte array fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testByteArray() throws Exception {
+ checkTwoValues(new byte[] { 1, 2 }, new byte[] { 1, 3 });
+ checkTwoValues(new byte[] { 1, 2 }, new byte[] { 1 });
+ checkTwoValues(new byte[] { 1, 2 }, new byte[] { 3 });
+ checkTwoValues(new byte[] { 1, 2 }, new byte[] { 1, 2, 3 });
+ }
+
+ /**
+ * Test boolean array fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testBooleanArray() throws Exception {
+ checkTwoValues(new boolean[] { true, false }, new boolean[] { false, true });
+ checkTwoValues(new boolean[] { true, false }, new boolean[] { true });
+ checkTwoValues(new boolean[] { true, false }, new boolean[] { false });
+ checkTwoValues(new boolean[] { true, false }, new boolean[] { true, false, true });
+ }
+
+ /**
+ * Test short array fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testShortArray() throws Exception {
+ checkTwoValues(new short[] { 1, 2 }, new short[] { 1, 3 });
+ checkTwoValues(new short[] { 1, 2 }, new short[] { 1 });
+ checkTwoValues(new short[] { 1, 2 }, new short[] { 3 });
+ checkTwoValues(new short[] { 1, 2 }, new short[] { 1, 2, 3 });
+ }
+
+ /**
+ * Test char array fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testCharArray() throws Exception {
+ checkTwoValues(new char[] { 1, 2 }, new char[] { 1, 3 });
+ checkTwoValues(new char[] { 1, 2 }, new char[] { 1 });
+ checkTwoValues(new char[] { 1, 2 }, new char[] { 3 });
+ checkTwoValues(new char[] { 1, 2 }, new char[] { 1, 2, 3 });
+ }
+
+ /**
+ * Test int array fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testIntArray() throws Exception {
+ checkTwoValues(new int[] { 1, 2 }, new int[] { 1, 3 });
+ checkTwoValues(new int[] { 1, 2 }, new int[] { 1 });
+ checkTwoValues(new int[] { 1, 2 }, new int[] { 3 });
+ checkTwoValues(new int[] { 1, 2 }, new int[] { 1, 2, 3 });
+ }
+
+ /**
+ * Test long array fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testLongArray() throws Exception {
+ checkTwoValues(new long[] { 1, 2 }, new long[] { 1, 3 });
+ checkTwoValues(new long[] { 1, 2 }, new long[] { 1 });
+ checkTwoValues(new long[] { 1, 2 }, new long[] { 3 });
+ checkTwoValues(new long[] { 1, 2 }, new long[] { 1, 2, 3 });
+ }
+
+ /**
+ * Test float array fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testFloatArray() throws Exception {
+ checkTwoValues(new float[] { 1.0f, 2.0f }, new float[] { 1.0f, 3.0f });
+ checkTwoValues(new float[] { 1.0f, 2.0f }, new float[] { 1.0f });
+ checkTwoValues(new float[] { 1.0f, 2.0f }, new float[] { 3.0f });
+ checkTwoValues(new float[] { 1.0f, 2.0f }, new float[] { 1.0f, 2.0f, 3.0f });
+ }
+
+ /**
+ * Test double array fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testDoubleArray() throws Exception {
+ checkTwoValues(new double[] { 1.0d, 2.0d }, new double[] { 1.0d, 3.0d });
+ checkTwoValues(new double[] { 1.0d, 2.0d }, new double[] { 1.0d });
+ checkTwoValues(new double[] { 1.0d, 2.0d }, new double[] { 3.0d });
+ checkTwoValues(new double[] { 1.0d, 2.0d }, new double[] { 1.0d, 2.0d, 3.0d });
+ }
+
+ /**
+ * Test string array fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testStringArray() throws Exception {
+ checkTwoValues(new String[] { "a", "b" }, new String[] { "a", "c" });
+ checkTwoValues(new String[] { "a", "b" }, new String[] { "a" });
+ checkTwoValues(new String[] { "a", "b" }, new String[] { "c" });
+ checkTwoValues(new String[] { "a", "b" }, new String[] { "a", "b", "c" });
+ }
+
+ /**
+ * Test date array fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testDateArray() throws Exception {
+ long curTime = System.currentTimeMillis();
+
+ Date v1 = new Date(curTime);
+ Date v2 = new Date(curTime + 1000);
+ Date v3 = new Date(curTime + 2000);
+
+ checkTwoValues(new Date[] { v1, v2 }, new Date[] { v1, v3 });
+ checkTwoValues(new Date[] { v1, v2 }, new Date[] { v1 });
+ checkTwoValues(new Date[] { v1, v2 }, new Date[] { v3 });
+ checkTwoValues(new Date[] { v1, v2 }, new Date[] { v1, v2, v3 });
+ }
+
+ /**
+ * Test timestamp array fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testTimestampArray() throws Exception {
+ long curTime = System.currentTimeMillis();
+
+ Timestamp v1 = new Timestamp(curTime);
+ Timestamp v2 = new Timestamp(curTime + 1000);
+ Timestamp v3 = new Timestamp(curTime + 2000);
+
+ checkTwoValues(new Timestamp[] { v1, v2 }, new Timestamp[] { v1, v3 });
+ checkTwoValues(new Timestamp[] { v1, v2 }, new Timestamp[] { v1 });
+ checkTwoValues(new Timestamp[] { v1, v2 }, new Timestamp[] { v3 });
+ checkTwoValues(new Timestamp[] { v1, v2 }, new Timestamp[] { v1, v2, v3 });
+ }
+
+ /**
+ * Test UUID array fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testUuidArray() throws Exception {
+ UUID v1 = UUID.randomUUID();
+ UUID v2 = UUID.randomUUID();
+ UUID v3 = UUID.randomUUID();
+
+ checkTwoValues(new UUID[] { v1, v2 }, new UUID[] { v1, v3 });
+ checkTwoValues(new UUID[] { v1, v2 }, new UUID[] { v1 });
+ checkTwoValues(new UUID[] { v1, v2 }, new UUID[] { v3 });
+ checkTwoValues(new UUID[] { v1, v2 }, new UUID[] { v1, v2, v3 });
+ }
+
+ /**
+ * Test decimal array fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testDecimalArray() throws Exception {
+ BigDecimal v1 = new BigDecimal("12.3E+7");
+ BigDecimal v2 = new BigDecimal("12.4E+7");
+ BigDecimal v3 = new BigDecimal("12.5E+7");
+
+ checkTwoValues(new BigDecimal[] { v1, v2 }, new BigDecimal[] { v1, v3 });
+ checkTwoValues(new BigDecimal[] { v1, v2 }, new BigDecimal[] { v1 });
+ checkTwoValues(new BigDecimal[] { v1, v2 }, new BigDecimal[] { v3 });
+ checkTwoValues(new BigDecimal[] { v1, v2 }, new BigDecimal[] { v1, v2, v3 });
+
+ v2 = new BigDecimal("12.3E+8");
+ v3 = new BigDecimal("12.3E+9");
+
+ checkTwoValues(new BigDecimal[] { v1, v2 }, new BigDecimal[] { v1, v3 });
+ checkTwoValues(new BigDecimal[] { v1, v2 }, new BigDecimal[] { v1 });
+ checkTwoValues(new BigDecimal[] { v1, v2 }, new BigDecimal[] { v3 });
+ checkTwoValues(new BigDecimal[] { v1, v2 }, new BigDecimal[] { v1, v2, v3 });
+ }
+
+ /**
+ * Test object array fields.
+ *
+ * @throws Exception If failed.
+ */
+ public void testInnerObjectArray() throws Exception {
+ InnerClass v1 = new InnerClass(1);
+ InnerClass v2 = new InnerClass(2);
+ InnerClass v3 = new InnerClass(3);
+
+ checkTwoValues(new InnerClass[] { v1, v2 }, new InnerClass[] { v1, v3 });
+ checkTwoValues(new InnerClass[] { v1, v2 }, new InnerClass[] { v1 });
+ checkTwoValues(new InnerClass[] { v1, v2 }, new InnerClass[] { v3 });
+ checkTwoValues(new InnerClass[] { v1, v2 }, new InnerClass[] { v1, v2, v3 });
+ }
+
+ /**
+ * Check two different not-null values.
+ *
+ * @throws Exception If failed.
+ */
+ public void checkTwoValues(Object val1, Object val2) throws Exception {
+ checkTwoValues(val1, val2, false, false);
+ checkTwoValues(val1, val2, false, true);
+ checkTwoValues(val1, val2, true, false);
+ checkTwoValues(val1, val2, true, true);
+ }
+
+ /**
+ * Check two different not-null values.
+ *
+ * @param val1 Value 1.
+ * @param val2 Value 2.
+ * @param offheap1 Offheap flag 1.
+ * @param offheap2 Offheap flag 2.
+ * @throws Exception If failed.
+ */
+ public void checkTwoValues(Object val1, Object val2, boolean offheap1, boolean offheap2) throws Exception {
+ assertNotNull(val1);
+ assertNotNull(val2);
+
+ compareSingle(convert(buildSingle(val1), offheap1), convert(buildSingle(val1), offheap2), true);
+ compareSingle(convert(buildSingle(val1), offheap1), convert(buildSingle(val2), offheap2), false);
+ compareSingle(convert(buildSingle(val1), offheap1), convert(buildSingle(null), offheap2), false);
+ compareSingle(convert(buildSingle(val1), offheap1), convert(buildEmpty(), offheap2), false);
+
+ compareSingle(convert(buildSingle(val2), offheap1), convert(buildSingle(val1), offheap2), false);
+ compareSingle(convert(buildSingle(val2), offheap1), convert(buildSingle(val2), offheap2), true);
+ compareSingle(convert(buildSingle(val2), offheap1), convert(buildSingle(null), offheap2), false);
+ compareSingle(convert(buildSingle(val2), offheap1), convert(buildEmpty(), offheap2), false);
+
+ compareSingle(convert(buildSingle(null), offheap1), convert(buildSingle(val1), offheap2), false);
+ compareSingle(convert(buildSingle(null), offheap1), convert(buildSingle(val2), offheap2), false);
+ compareSingle(convert(buildSingle(null), offheap1), convert(buildSingle(null), offheap2), true);
+ compareSingle(convert(buildSingle(null), offheap1), convert(buildEmpty(), offheap2), true);
+
+ compareSingle(convert(buildEmpty(), offheap1), convert(buildSingle(val1), offheap2), false);
+ compareSingle(convert(buildEmpty(), offheap1), convert(buildSingle(val2), offheap2), false);
+ compareSingle(convert(buildEmpty(), offheap1), convert(buildSingle(null), offheap2), true);
+ compareSingle(convert(buildEmpty(), offheap1), convert(buildEmpty(), offheap2), true);
+ }
+
+ /**
+ * Compare single field.
+ *
+ * @param first First object.
+ * @param second Second object.
+ * @param expRes Expected result.
+ */
+ private void compareSingle(BinaryObjectExImpl first, BinaryObjectExImpl second, boolean expRes) {
+ BinarySerializedFieldComparator firstComp = first.createFieldComparator();
+ BinarySerializedFieldComparator secondComp = second.createFieldComparator();
+
+ // Compare expected result.
+ firstComp.findField(singleFieldOrder(first));
+ secondComp.findField(singleFieldOrder(second));
+
+ assertEquals(expRes, BinarySerializedFieldComparator.equals(firstComp, secondComp));
+ }
+
+ /**
+ * Get single field order.
+ *
+ * @param obj Object.
+ * @return Order.
+ */
+ private int singleFieldOrder(BinaryObjectExImpl obj) {
+ return obj.hasField(FIELD_SINGLE) ? 0 : BinarySchema.ORDER_NOT_FOUND;
+ }
+
+ /**
+ * Convert binary object to it's final state.
+ *
+ * @param obj Object.
+ * @param offheap Offheap flag.
+ * @return Result.
+ */
+ private BinaryObjectExImpl convert(BinaryObjectExImpl obj, boolean offheap) {
+ if (offheap) {
+ byte[] arr = obj.array();
+
+ long ptr = GridUnsafe.allocateMemory(arr.length);
+
+ ptrs.add(ptr);
+
+ GridUnsafe.copyMemory(arr, GridUnsafe.BYTE_ARR_OFF, null, ptr, arr.length);
+
+ obj = new BinaryObjectOffheapImpl(obj.context(), ptr, 0, obj.array().length);
+ }
+
+ return obj;
+ }
+
+ /**
+ * Build object with a single field.
+ *
+ * @param val Value.
+ * @return Result.
+ */
+ private BinaryObjectImpl buildSingle(Object val) {
+ return build(FIELD_SINGLE, val);
+ }
+
+ /**
+ * Build empty object.
+ *
+ * @return Empty object.
+ */
+ private BinaryObjectImpl buildEmpty() {
+ return build();
+ }
+
+ /**
+ * Build object.
+ *
+ * @param parts Parts.
+ * @return Result.
+ */
+ private BinaryObjectImpl build(Object... parts) {
+ String typeName = "Type" + TYPE_CTR.get();
+
+ BinaryObjectBuilder builder = grid().binary().builder(typeName);
+
+ if (!F.isEmpty(parts)) {
+ for (int i = 0; i < parts.length; )
+ builder.setField((String)parts[i++], parts[i++]);
+ }
+
+ return (BinaryObjectImpl) builder.build();
+ }
+
+ /**
+ * Inner class.
+ */
+ @SuppressWarnings("unused")
+ private static class InnerClass {
+ /** Value. */
+ private int val;
+
+ /**
+ * Constructor.
+ *
+ * @param val Value.
+ */
+ public InnerClass(int val) {
+ this.val = val;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectsAbstractSelfTest.java
index 150c245..2a177ff 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectsAbstractSelfTest.java
@@ -25,6 +25,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -36,17 +37,22 @@ import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteBinary;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.binary.BinaryArrayIdentityResolver;
import org.apache.ignite.binary.BinaryNameMapper;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryIdentityResolver;
import org.apache.ignite.binary.BinaryReader;
+import org.apache.ignite.binary.BinaryTypeConfiguration;
import org.apache.ignite.binary.BinaryWriter;
import org.apache.ignite.binary.Binarylizable;
import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheKeyConfiguration;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.BinaryConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
@@ -55,9 +61,12 @@ import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.BinaryObjectImpl;
import org.apache.ignite.internal.binary.BinaryObjectOffheapImpl;
+import org.apache.ignite.binary.BinaryFieldIdentityResolver;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.MapCacheStoreStrategy;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -103,13 +112,79 @@ public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonA
cfg.setDiscoverySpi(disco);
+ CacheConfiguration cacheCfg = createCacheConfig();
+
+ cacheCfg.setCacheStoreFactory(singletonFactory(new TestStore()));
+
+ CacheConfiguration binKeysCacheCfg = createCacheConfig();
+
+ binKeysCacheCfg.setCacheStoreFactory(singletonFactory(new MapCacheStoreStrategy.MapCacheStore()));
+ binKeysCacheCfg.setStoreKeepBinary(true);
+ binKeysCacheCfg.setName("BinKeysCache");
+
+ cfg.setCacheConfiguration(cacheCfg, binKeysCacheCfg);
+ cfg.setMarshaller(new BinaryMarshaller());
+
+ List<BinaryTypeConfiguration> binTypes = new ArrayList<>();
+
+ binTypes.add(new BinaryTypeConfiguration() {{
+ setTypeName("ArrayHashedKey");
+
+ setIdentityResolver(new BinaryArrayIdentityResolver());
+ }});
+
+ binTypes.add(new BinaryTypeConfiguration() {{
+ setTypeName("FieldsHashedKey");
+
+ BinaryFieldIdentityResolver id = new BinaryFieldIdentityResolver();
+ id.setFieldNames("fld1", "fld3");
+
+ setIdentityResolver(id);
+ }});
+
+ binTypes.add(new BinaryTypeConfiguration() {{
+ setTypeName("CustomHashedKey");
+
+ setIdentityResolver(new IdentityResolver());
+ }});
+
+ binTypes.add(new BinaryTypeConfiguration() {{
+ setTypeName(ComplexBinaryFieldsListHashedKey.class.getName());
+
+ BinaryFieldIdentityResolver id = new BinaryFieldIdentityResolver();
+
+ id.setFieldNames("secondField", "thirdField");
+
+ setIdentityResolver(id);
+ }});
+
+ BinaryConfiguration binCfg = new BinaryConfiguration();
+ binCfg.setTypeConfigurations(binTypes);
+
+ cfg.setBinaryConfiguration(binCfg);
+
+ CacheKeyConfiguration arrayHashCfg = new CacheKeyConfiguration("ArrayHashedKey", "fld1");
+ CacheKeyConfiguration fieldsHashCfg = new CacheKeyConfiguration("FieldsHashedKey", "fld1");
+ CacheKeyConfiguration customHashCfg = new CacheKeyConfiguration("CustomHashedKey", "fld1");
+
+ cfg.setCacheKeyConfiguration(arrayHashCfg, fieldsHashCfg, customHashCfg);
+
+ GridCacheBinaryObjectsAbstractSelfTest.cfg = cfg;
+
+ return cfg;
+ }
+
+ /**
+ * @return Cache configuration with basic settings.
+ */
+ @SuppressWarnings("unchecked")
+ private CacheConfiguration createCacheConfig() {
CacheConfiguration cacheCfg = new CacheConfiguration();
cacheCfg.setCacheMode(cacheMode());
cacheCfg.setAtomicityMode(atomicityMode());
cacheCfg.setNearConfiguration(nearConfiguration());
cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
- cacheCfg.setCacheStoreFactory(singletonFactory(new TestStore()));
cacheCfg.setReadThrough(true);
cacheCfg.setWriteThrough(true);
cacheCfg.setLoadPreviousValue(true);
@@ -120,13 +195,7 @@ public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonA
cacheCfg.setOffHeapMaxMemory(0);
}
- cfg.setCacheConfiguration(cacheCfg);
-
- cfg.setMarshaller(new BinaryMarshaller());
-
- this.cfg = cfg;
-
- return cfg;
+ return cacheCfg;
}
/**
@@ -924,6 +993,125 @@ public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonA
}
/**
+ *
+ */
+ @SuppressWarnings("unchecked")
+ public void testCrossFormatObjectsIdentity() {
+ IgniteCache c = binKeysCache();
+
+ c.put(new ComplexBinaryFieldsListHashedKey(), "zzz");
+
+ // Now let's build an identical key for get
+ BinaryObjectBuilder bldr = grid(0).binary().builder(ComplexBinaryFieldsListHashedKey.class.getName());
+
+ bldr.setField("firstField", 365);
+ bldr.setField("secondField", "value");
+ bldr.setField("thirdField", 0x1020304050607080L);
+
+ BinaryObject binKey = bldr.build();
+
+ assertEquals("zzz", c.get(binKey));
+ }
+
+ /**
+ *
+ */
+ @SuppressWarnings("unchecked")
+ public void testPutWithArrayHashing() {
+ IgniteCache c = binKeysCache();
+
+ {
+ BinaryObjectBuilder bldr = grid(0).binary().builder("ArrayHashedKey");
+
+ BinaryObject binKey = bldr.setField("fld1", 5).setField("fld2", 1).setField("fld3", "abc").build();
+
+ c.put(binKey, "zzz");
+ }
+
+ // Now let's build an identical key for get.
+ {
+ BinaryObjectBuilder bldr = grid(0).binary().builder("ArrayHashedKey");
+
+ BinaryObject binKey = bldr.setField("fld1", 5).setField("fld2", 1).setField("fld3", "abc").build();
+
+ assertEquals("zzz", c.get(binKey));
+ }
+
+ // Now let's build not identical key for get.
+ {
+ BinaryObjectBuilder bldr = grid(0).binary().builder("ArrayHashedKey");
+
+ BinaryObject binKey = bldr.setField("fld1", 5).setField("fld2", 100).setField("fld3", "abc").build();
+
+ assertNull(c.get(binKey));
+ }
+ }
+
+ /**
+ *
+ */
+ @SuppressWarnings("unchecked")
+ public void testPutWithFieldsHashing() {
+ IgniteCache c = binKeysCache();
+
+ {
+ BinaryObjectBuilder bldr = grid(0).binary().builder("FieldsHashedKey");
+
+ bldr.setField("fld1", 5);
+ bldr.setField("fld2", 1);
+ bldr.setField("fld3", "abc");
+
+ BinaryObject binKey = bldr.build();
+
+ c.put(binKey, "zzz");
+ }
+
+ // Now let's build an identical key for get
+ {
+ BinaryObjectBuilder bldr = grid(0).binary().builder("FieldsHashedKey");
+
+ bldr.setField("fld1", 5);
+ bldr.setField("fld2", 100); // This one does not participate in hashing
+ bldr.setField("fld3", "abc");
+
+ BinaryObject binKey = bldr.build();
+
+ assertEquals("zzz", c.get(binKey));
+ }
+ }
+
+ /**
+ *
+ */
+ @SuppressWarnings("unchecked")
+ public void testPutWithCustomHashing() {
+ IgniteCache c = binKeysCache();
+
+ {
+ BinaryObjectBuilder bldr = grid(0).binary().builder("CustomHashedKey");
+
+ bldr.setField("fld1", 5);
+ bldr.setField("fld2", "abc");
+
+ BinaryObject binKey = bldr.build();
+
+ c.put(binKey, "zzz");
+ }
+
+ // Now let's build an identical key for get
+ {
+ BinaryObjectBuilder bldr = grid(0).binary().builder("CustomHashedKey");
+
+ bldr.setField("fld1", 5);
+ bldr.setField("fld2", "xxx");
+
+ BinaryObject binKey = bldr.build();
+
+ assertEquals("zzz", c.get(binKey));
+ }
+ }
+
+ /**
* @throws Exception if failed.
*/
public void testKeepBinaryTxOverwrite() throws Exception {
@@ -1034,6 +1222,13 @@ public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonA
}
/**
+ * @return Cache tuned to utilize classless binary objects as keys.
+ */
+ private <K, V> IgniteCache<K, V> binKeysCache() {
+ return ignite(0).cache("BinKeysCache").withKeepBinary();
+ }
+
+ /**
* @param key Key.
* @throws Exception If failed.
*/
@@ -1221,4 +1416,53 @@ public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonA
// No-op.
}
}
+
+ /**
+ *
+ */
+ private final static class IdentityResolver implements BinaryIdentityResolver {
+ /** {@inheritDoc} */
+ @Override public int hashCode(BinaryObject builder) {
+ return (Integer) builder.field("fld1") * 31 / 5;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(BinaryObject o1, BinaryObject o2) {
+ return o1 == o2 || (o1 != null && o2 != null && F.eq(o1.field("fld1"), o2.field("fld1")));
+
+ }
+ }
+
+ /**
+ * Key to test puts and gets with
+ */
+ @SuppressWarnings({"ConstantConditions", "unused"})
+ private final static class ComplexBinaryFieldsListHashedKey {
+ /** */
+ private final Integer firstField = 1;
+
+ /** */
+ private final String secondField = "value";
+
+ /** */
+ private final Long thirdField = 0x1020304050607080L;
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ComplexBinaryFieldsListHashedKey that = (ComplexBinaryFieldsListHashedKey) o;
+
+ return secondField.equals(that.secondField) &&
+ thirdField.equals(that.thirdField);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = secondField.hashCode();
+ res = 31 * res + thirdField.hashCode();
+ return res;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java
index c1d9974..3496dbf 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java
@@ -18,19 +18,23 @@
package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
+import org.apache.ignite.internal.binary.BinaryArrayIdentityResolverSelfTest;
import org.apache.ignite.internal.binary.BinaryBasicIdMapperSelfTest;
import org.apache.ignite.internal.binary.BinaryBasicNameMapperSelfTest;
import org.apache.ignite.internal.binary.BinaryConfigurationConsistencySelfTest;
import org.apache.ignite.internal.binary.BinaryEnumsSelfTest;
+import org.apache.ignite.internal.binary.BinaryFieldIdentityResolverSelfTest;
import org.apache.ignite.internal.binary.BinaryFieldsHeapSelfTest;
import org.apache.ignite.internal.binary.BinaryFieldsOffheapSelfTest;
import org.apache.ignite.internal.binary.BinaryFooterOffsetsHeapSelfTest;
import org.apache.ignite.internal.binary.BinaryFooterOffsetsOffheapSelfTest;
+import org.apache.ignite.internal.binary.BinaryIdentityResolverConfigurationSelfTest;
import org.apache.ignite.internal.binary.BinaryMarshallerSelfTest;
import org.apache.ignite.internal.binary.BinaryObjectBuilderAdditionalSelfTest;
import org.apache.ignite.internal.binary.BinaryObjectBuilderDefaultMappersSelfTest;
import org.apache.ignite.internal.binary.BinaryObjectBuilderSimpleNameLowerCaseMappersSelfTest;
import org.apache.ignite.internal.binary.BinaryObjectToStringSelfTest;
+import org.apache.ignite.internal.binary.BinarySerialiedFieldComparatorSelfTest;
import org.apache.ignite.internal.binary.BinarySimpleNameTestPropertySelfTest;
import org.apache.ignite.internal.binary.BinaryTreeSelfTest;
import org.apache.ignite.internal.binary.GridBinaryAffinityKeySelfTest;
@@ -89,6 +93,12 @@ public class IgniteBinaryObjectsTestSuite extends TestSuite {
suite.addTestSuite(BinaryTreeSelfTest.class);
suite.addTestSuite(BinaryMarshallerSelfTest.class);
+
+ suite.addTestSuite(BinarySerialiedFieldComparatorSelfTest.class);
+ suite.addTestSuite(BinaryArrayIdentityResolverSelfTest.class);
+ suite.addTestSuite(BinaryFieldIdentityResolverSelfTest.class);
+ suite.addTestSuite(BinaryIdentityResolverConfigurationSelfTest.class);
+
suite.addTestSuite(BinaryConfigurationConsistencySelfTest.class);
suite.addTestSuite(GridBinaryMarshallerCtxDisabledSelfTest.class);
suite.addTestSuite(BinaryObjectBuilderDefaultMappersSelfTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
new file mode 100644
index 0000000..8dcba2f
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Two step map-reduce style query.
+ */
+public class GridCacheTwoStepQuery {
+ /** */
+ public static final int DFLT_PAGE_SIZE = 1000;
+
+ /** */
+ @GridToStringInclude
+ private List<GridCacheSqlQuery> mapQrys = new ArrayList<>();
+
+ /** */
+ @GridToStringInclude
+ private GridCacheSqlQuery rdc;
+
+ /** */
+ private int pageSize = DFLT_PAGE_SIZE;
+
+ /** */
+ private boolean explain;
+
+ /** */
+ private Collection<String> spaces;
+
+ /** */
+ private Set<String> schemas;
+
+ /** */
+ private Set<String> tbls;
+
+ /** */
+ private boolean distributedJoins;
+
+ /** */
+ private boolean skipMergeTbl;
+
+ /** */
+ private List<Integer> caches;
+
+ /** */
+ private List<Integer> extraCaches;
+
+ /**
+ * @param schemas Schema names in query.
+ * @param tbls Tables in query.
+ */
+ public GridCacheTwoStepQuery(Set<String> schemas, Set<String> tbls) {
+ this.schemas = schemas;
+ this.tbls = tbls;
+ }
+
+ /**
+ * Specify if distributed joins are enabled for this query.
+ *
+ * @param distributedJoins Distributed joins enabled.
+ */
+ public void distributedJoins(boolean distributedJoins) {
+ this.distributedJoins = distributedJoins;
+ }
+
+ /**
+ * Check if distributed joins are enabled for this query.
+ *
+ * @return {@code true} If distributed joind enabled.
+ */
+ public boolean distributedJoins() {
+ return distributedJoins;
+ }
+
+
+ /**
+ * @return {@code True} if reduce query can skip merge table creation and get data directly from merge index.
+ */
+ public boolean skipMergeTable() {
+ return skipMergeTbl;
+ }
+
+ /**
+ * @param skipMergeTbl Skip merge table.
+ */
+ public void skipMergeTable(boolean skipMergeTbl) {
+ this.skipMergeTbl = skipMergeTbl;
+ }
+
+ /**
+ * @return If this is explain query.
+ */
+ public boolean explain() {
+ return explain;
+ }
+
+ /**
+ * @param explain If this is explain query.
+ */
+ public void explain(boolean explain) {
+ this.explain = explain;
+ }
+
+ /**
+ * @param pageSize Page size.
+ */
+ public void pageSize(int pageSize) {
+ this.pageSize = pageSize;
+ }
+
+ /**
+ * @return Page size.
+ */
+ public int pageSize() {
+ return pageSize;
+ }
+
+ /**
+ * @param qry SQL Query.
+ * @return {@code this}.
+ */
+ public GridCacheTwoStepQuery addMapQuery(GridCacheSqlQuery qry) {
+ mapQrys.add(qry);
+
+ return this;
+ }
+
+ /**
+ * @return Reduce query.
+ */
+ public GridCacheSqlQuery reduceQuery() {
+ return rdc;
+ }
+
+ /**
+ * @param rdc Reduce query.
+ */
+ public void reduceQuery(GridCacheSqlQuery rdc) {
+ this.rdc = rdc;
+ }
+
+ /**
+ * @return Map queries.
+ */
+ public List<GridCacheSqlQuery> mapQueries() {
+ return mapQrys;
+ }
+
+ /**
+ * @return Caches.
+ */
+ public List<Integer> caches() {
+ return caches;
+ }
+
+ /**
+ * @param caches Caches.
+ */
+ public void caches(List<Integer> caches) {
+ this.caches = caches;
+ }
+
+ /**
+ * @return Caches.
+ */
+ public List<Integer> extraCaches() {
+ return extraCaches;
+ }
+
+ /**
+ * @param extraCaches Caches.
+ */
+ public void extraCaches(List<Integer> extraCaches) {
+ this.extraCaches = extraCaches;
+ }
+
+ /**
+ * @return Spaces.
+ */
+ public Collection<String> spaces() {
+ return spaces;
+ }
+
+ /**
+ * @param spaces Spaces.
+ */
+ public void spaces(Collection<String> spaces) {
+ this.spaces = spaces;
+ }
+
+ /**
+ * @return Schemas.
+ */
+ public Set<String> schemas() {
+ return schemas;
+ }
+
+ /**
+ * @param args New arguments to copy with.
+ * @return Copy.
+ */
+ public GridCacheTwoStepQuery copy(Object[] args) {
+ assert !explain;
+
+ GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(schemas, tbls);
+
+ cp.caches = caches;
+ cp.extraCaches = extraCaches;
+ cp.spaces = spaces;
+ cp.rdc = rdc.copy(args);
+ cp.skipMergeTbl = skipMergeTbl;
+ cp.pageSize = pageSize;
+ cp.distributedJoins = distributedJoins;
+
+ for (int i = 0; i < mapQrys.size(); i++)
+ cp.mapQrys.add(mapQrys.get(i).copy(args));
+
+ return cp;
+ }
+
+ /**
+ * @return Tables.
+ */
+ public Set<String> tables() {
+ return tbls;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheTwoStepQuery.class, this);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
new file mode 100644
index 0000000..7634965
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -0,0 +1,1027 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryArrayIdentityResolver;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.processors.cache.CacheOperationContext;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheUtils;
+import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
+import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter;
+import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.h2.dml.FastUpdateArguments;
+import org.apache.ignite.internal.processors.query.h2.dml.KeyValueSupplier;
+import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
+import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
+import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
+import org.apache.ignite.internal.util.lang.IgniteSingletonIterator;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.indexing.IndexingQueryFilter;
+import org.h2.command.Prepared;
+import org.h2.jdbc.JdbcPreparedStatement;
+import org.h2.table.Column;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+
+import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.createJdbcSqlException;
+
+/**
+ *
+ */
+public class DmlStatementsProcessor {
+ /** Default number of attempts to re-run DELETE and UPDATE queries in case of concurrent modifications of values. */
+ private final static int DFLT_DML_RERUN_ATTEMPTS = 4;
+
+ /** Indexing. */
+ private final IgniteH2Indexing indexing;
+
+ /** Set of binary type ids for which warning about missing identity in configuration has been printed. */
+ private final static Set<Integer> WARNED_TYPES =
+ Collections.newSetFromMap(new ConcurrentHashMap8<Integer, Boolean>());
+
+ /** Default size for update plan cache. */
+ private static final int PLAN_CACHE_SIZE = 1024;
+
+ /** Update plans cache. */
+ private final ConcurrentMap<String, ConcurrentMap<String, UpdatePlan>> planCache = new ConcurrentHashMap<>();
+
+ /** Dummy metadata for update result. */
+ private final static List<GridQueryFieldMetadata> UPDATE_RESULT_META = Collections.<GridQueryFieldMetadata>
+ singletonList(new IgniteH2Indexing.SqlFieldMetadata(null, null, "UPDATED", Long.class.getName()));
+
+ /**
+ * @param indexing indexing.
+ */
+ DmlStatementsProcessor(IgniteH2Indexing indexing) {
+ this.indexing = indexing;
+ }
+
+ /**
+ * Execute DML statement, possibly with few re-attempts in case of concurrent data modifications.
+ *
+ * @param spaceName Space name.
+ * @param stmt JDBC statement.
+ * @param fieldsQry Original query.
+ * @param loc Query locality flag.
+ * @param filters Space name and key filter.
+ * @param cancel Cancel.
+ * @return Update result (modified items count and failed keys).
+ * @throws IgniteCheckedException if failed.
+ */
+ private long updateSqlFields(String spaceName, PreparedStatement stmt, SqlFieldsQuery fieldsQry,
+ boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel) throws IgniteCheckedException {
+ Object[] errKeys = null;
+
+ long items = 0;
+
+ UpdatePlan plan = getPlanForStatement(spaceName, stmt, null);
+
+ for (int i = 0; i < DFLT_DML_RERUN_ATTEMPTS; i++) {
+ UpdateResult r = executeUpdateStatement(plan.tbl.rowDescriptor().context(), stmt, fieldsQry, loc, filters,
+ cancel, errKeys);
+
+ if (F.isEmpty(r.errKeys))
+ return r.cnt + items;
+ else {
+ items += r.cnt;
+ errKeys = r.errKeys;
+ }
+ }
+
+ throw new IgniteSQLException("Failed to update or delete some keys: " + Arrays.deepToString(errKeys),
+ IgniteQueryErrorCode.CONCURRENT_UPDATE);
+ }
+
+ /**
+ * @param spaceName Space name.
+ * @param stmt Prepared statement.
+ * @param fieldsQry Initial query.
+ * @param cancel Query cancel.
+ * @return Update result wrapped into {@link GridQueryFieldsResult}
+ * @throws IgniteCheckedException if failed.
+ */
+ @SuppressWarnings("unchecked")
+ QueryCursorImpl<List<?>> updateSqlFieldsTwoStep(String spaceName, PreparedStatement stmt,
+ SqlFieldsQuery fieldsQry, GridQueryCancel cancel) throws IgniteCheckedException {
+ long res = updateSqlFields(spaceName, stmt, fieldsQry, false, null, cancel);
+
+ return cursorForUpdateResult(res);
+ }
+
+ /**
+ * Execute DML statement on local cache.
+ * @param spaceName Space name.
+ * @param stmt Prepared statement.
+ * @param filters Space name and key filter.
+ * @param cancel Query cancel.
+ * @return Update result wrapped into {@link GridQueryFieldsResult}
+ * @throws IgniteCheckedException if failed.
+ */
+ @SuppressWarnings("unchecked")
+ GridQueryFieldsResult updateLocalSqlFields(String spaceName, PreparedStatement stmt,
+ SqlFieldsQuery fieldsQry, IndexingQueryFilter filters, GridQueryCancel cancel) throws IgniteCheckedException {
+ long res = updateSqlFields(spaceName, stmt, fieldsQry, true, filters, cancel);
+
+ return new GridQueryFieldsResultAdapter(UPDATE_RESULT_META,
+ new IgniteSingletonIterator(Collections.singletonList(res)));
+ }
+
+ /**
+ * Actually perform SQL DML operation locally.
+ * @param cctx Cache context.
+ * @param prepStmt Prepared statement for DML query.
+ * @param filters Space name and key filter.
+ * @param failedKeys Keys to restrict UPDATE and DELETE operations with. Null or empty array means no restriction.
+ * @return Pair [number of successfully processed items; keys that have failed to be processed]
+ * @throws IgniteCheckedException if failed.
+ */
+ @SuppressWarnings("ConstantConditions")
+ private UpdateResult executeUpdateStatement(final GridCacheContext cctx, PreparedStatement prepStmt,
+ SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel, Object[] failedKeys)
+ throws IgniteCheckedException {
+ Integer errKeysPos = null;
+
+ if (!F.isEmpty(failedKeys))
+ errKeysPos = F.isEmpty(fieldsQry.getArgs()) ? 1 : fieldsQry.getArgs().length + 1;
+
+ UpdatePlan plan = getPlanForStatement(cctx.name(), prepStmt, errKeysPos);
+
+ Object[] params = fieldsQry.getArgs();
+
+ if (plan.fastUpdateArgs != null) {
+ assert F.isEmpty(failedKeys) && errKeysPos == null;
+
+ return new UpdateResult(doSingleUpdate(plan, params), X.EMPTY_OBJECT_ARRAY);
+ }
+
+ assert !F.isEmpty(plan.selectQry);
+
+ QueryCursorImpl<List<?>> cur;
+
+ // Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual
+ // subquery and not some dummy stuff like "select 1, 2, 3;"
+ if (!loc && !plan.isLocSubqry) {
+ SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQry, fieldsQry.isCollocated())
+ .setArgs(params)
+ .setDistributedJoins(fieldsQry.isDistributedJoins())
+ .setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder())
+ .setLocal(fieldsQry.isLocal())
+ .setPageSize(fieldsQry.getPageSize())
+ .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS);
+
+ cur = (QueryCursorImpl<List<?>>) indexing.queryTwoStep(cctx, newFieldsQry, cancel);
+ }
+ else {
+ final GridQueryFieldsResult res = indexing.queryLocalSqlFields(cctx.name(), plan.selectQry, F.asList(params),
+ filters, fieldsQry.isEnforceJoinOrder(), fieldsQry.getTimeout(), cancel);
+
+ cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
+ @Override public Iterator<List<?>> iterator() {
+ try {
+ return new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepBinary());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }, cancel);
+
+ cur.fieldsMeta(res.metaData());
+ }
+
+ int pageSize = loc ? 0 : fieldsQry.getPageSize();
+
+ switch (plan.mode) {
+ case MERGE:
+ return new UpdateResult(doMerge(plan, cur, pageSize), X.EMPTY_OBJECT_ARRAY);
+
+ case INSERT:
+ return new UpdateResult(doInsert(plan, cur, pageSize), X.EMPTY_OBJECT_ARRAY);
+
+ case UPDATE:
+ return doUpdate(plan, cur, pageSize);
+
+ case DELETE:
+ return doDelete(cctx, cur, pageSize);
+
+ default:
+ throw new IgniteSQLException("Unexpected DML operation [mode=" + plan.mode + ']',
+ IgniteQueryErrorCode.UNEXPECTED_OPERATION);
+ }
+ }
+
+ /**
+ * Generate SELECT statements to retrieve data for modifications from and find fast UPDATE or DELETE args,
+ * if available.
+ * @param spaceName Space name.
+ * @param prepStmt JDBC statement.
+ * @return Update plan.
+ */
+ @SuppressWarnings({"unchecked", "ConstantConditions"})
+ private UpdatePlan getPlanForStatement(String spaceName, PreparedStatement prepStmt,
+ @Nullable Integer errKeysPos) throws IgniteCheckedException {
+ Prepared p = GridSqlQueryParser.prepared((JdbcPreparedStatement) prepStmt);
+
+ spaceName = F.isEmpty(spaceName) ? "default" : spaceName;
+
+ ConcurrentMap<String, UpdatePlan> spacePlans = planCache.get(spaceName);
+
+ if (spacePlans == null) {
+ spacePlans = new GridBoundedConcurrentLinkedHashMap<>(PLAN_CACHE_SIZE);
+
+ spacePlans = U.firstNotNull(planCache.putIfAbsent(spaceName, spacePlans), spacePlans);
+ }
+
+ // getSQL returns field value, so it's fast
+ // Don't look for re-runs in cache, we don't cache them
+ UpdatePlan res = (errKeysPos == null ? spacePlans.get(p.getSQL()) : null);
+
+ if (res != null)
+ return res;
+
+ res = UpdatePlanBuilder.planForStatement(p, errKeysPos);
+
+ // Don't cache re-runs
+ if (errKeysPos == null)
+ return U.firstNotNull(spacePlans.putIfAbsent(p.getSQL(), res), res);
+ else
+ return res;
+ }
+
+ /**
+ * Perform single cache operation based on given args.
+ * @param params Query parameters.
+ * @return 1 if an item was affected, 0 otherwise.
+ * @throws IgniteCheckedException if failed.
+ */
+ @SuppressWarnings({"unchecked", "ConstantConditions"})
+ private static long doSingleUpdate(UpdatePlan plan, Object[] params) throws IgniteCheckedException {
+ GridCacheContext cctx = plan.tbl.rowDescriptor().context();
+
+ FastUpdateArguments singleUpdate = plan.fastUpdateArgs;
+
+ assert singleUpdate != null;
+
+ int res;
+
+ Object key = singleUpdate.key.apply(params);
+ Object val = singleUpdate.val.apply(params);
+ Object newVal = singleUpdate.newVal.apply(params);
+
+ if (newVal != null) { // Single item UPDATE
+ if (val == null) // No _val bound in source query
+ res = cctx.cache().replace(key, newVal) ? 1 : 0;
+ else
+ res = cctx.cache().replace(key, val, newVal) ? 1 : 0;
+ }
+ else { // Single item DELETE
+ if (val == null) // No _val bound in source query
+ res = cctx.cache().remove(key) ? 1 : 0;
+ else
+ res = cctx.cache().remove(key, val) ? 1 : 0;
+ }
+
+ return res;
+ }
+
+ /**
+ * Perform DELETE operation on top of results of SELECT.
+ * @param cctx Cache context.
+ * @param cursor SELECT results.
+ * @param pageSize Batch size for streaming, anything <= 0 for single page operations.
+ * @return Results of DELETE (number of items affected AND keys that failed to be updated).
+ */
+ @SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
+ private UpdateResult doDelete(GridCacheContext cctx, QueryCursorImpl<List<?>> cursor, int pageSize)
+ throws IgniteCheckedException {
+ // With DELETE, we have only two columns - key and value.
+ long res = 0;
+
+ CacheOperationContext opCtx = cctx.operationContextPerCall();
+
+ // Force keepBinary for operation context to avoid binary deserialization inside entry processor
+ if (cctx.binaryMarshaller()) {
+ CacheOperationContext newOpCtx = null;
+
+ if (opCtx == null)
+ // Mimics behavior of GridCacheAdapter#keepBinary and GridCacheProxyImpl#keepBinary
+ newOpCtx = new CacheOperationContext(false, null, true, null, false, null);
+ else if (!opCtx.isKeepBinary())
+ newOpCtx = opCtx.keepBinary();
+
+ if (newOpCtx != null)
+ cctx.operationContextPerCall(newOpCtx);
+ }
+
+ // Keys that failed to DELETE due to concurrent updates.
+ List<Object> failedKeys = new ArrayList<>();
+
+ SQLException resEx = null;
+
+ try {
+ Iterator<List<?>> it = cursor.iterator();
+ Map<Object, EntryProcessor<Object, Object, Boolean>> rows = new LinkedHashMap<>();
+
+ while (it.hasNext()) {
+ List<?> e = it.next();
+ if (e.size() != 2) {
+ U.warn(indexing.getLogger(), "Invalid row size on DELETE - expected 2, got " + e.size());
+ continue;
+ }
+
+ rows.put(e.get(0), new ModifyingEntryProcessor(e.get(1), RMV));
+
+ if ((pageSize > 0 && rows.size() == pageSize) || (!it.hasNext())) {
+ PageProcessingResult pageRes = processPage(cctx, rows);
+
+ res += pageRes.cnt;
+
+ failedKeys.addAll(F.asList(pageRes.errKeys));
+
+ if (pageRes.ex != null) {
+ if (resEx == null)
+ resEx = pageRes.ex;
+ else
+ resEx.setNextException(pageRes.ex);
+ }
+
+ if (it.hasNext())
+ rows.clear(); // No need to clear after the last batch.
+ }
+ }
+
+ if (resEx != null) {
+ if (!F.isEmpty(failedKeys)) {
+ // Don't go for a re-run if processing of some keys yielded exceptions and report keys that
+ // had been modified concurrently right away.
+ String msg = "Failed to DELETE some keys because they had been modified concurrently " +
+ "[keys=" + failedKeys + ']';
+
+ SQLException conEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
+
+ conEx.setNextException(resEx);
+
+ resEx = conEx;
+ }
+
+ throw new IgniteSQLException(resEx);
+ }
+ }
+ finally {
+ cctx.operationContextPerCall(opCtx);
+ }
+
+ return new UpdateResult(res, failedKeys.toArray());
+ }
+
+ /**
+ * Perform UPDATE operation on top of results of SELECT.
+ * @param cursor SELECT results.
+ * @param pageSize Batch size for streaming, anything <= 0 for single page operations.
+ * @return Pair [cursor corresponding to results of UPDATE (contains number of items affected); keys whose values
+ * had been modified concurrently (arguments for a re-run)].
+ */
+ @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
+ private UpdateResult doUpdate(UpdatePlan plan, QueryCursorImpl<List<?>> cursor, int pageSize)
+ throws IgniteCheckedException {
+ GridH2RowDescriptor desc = plan.tbl.rowDescriptor();
+
+ GridCacheContext cctx = desc.context();
+
+ boolean bin = cctx.binaryMarshaller();
+
+ String[] updatedColNames = plan.colNames;
+
+ int valColIdx = plan.valColIdx;
+
+ boolean hasNewVal = (valColIdx != -1);
+
+ // Statement updates distinct properties if it does not have _val in updated columns list
+ // or if its list of updated columns includes only _val, i.e. is single element.
+ boolean hasProps = !hasNewVal || updatedColNames.length > 1;
+
+ long res = 0;
+
+ CacheOperationContext opCtx = cctx.operationContextPerCall();
+
+ // Force keepBinary for operation context to avoid binary deserialization inside entry processor
+ if (cctx.binaryMarshaller()) {
+ CacheOperationContext newOpCtx = null;
+
+ if (opCtx == null)
+ // Mimics behavior of GridCacheAdapter#keepBinary and GridCacheProxyImpl#keepBinary
+ newOpCtx = new CacheOperationContext(false, null, true, null, false, null);
+ else if (!opCtx.isKeepBinary())
+ newOpCtx = opCtx.keepBinary();
+
+ if (newOpCtx != null)
+ cctx.operationContextPerCall(newOpCtx);
+ }
+
+ Map<Object, EntryProcessor<Object, Object, Boolean>> rows = new LinkedHashMap<>();
+
+ // Keys that failed to UPDATE due to concurrent updates.
+ List<Object> failedKeys = new ArrayList<>();
+
+ SQLException resEx = null;
+
+ try {
+ Iterator<List<?>> it = cursor.iterator();
+
+ while (it.hasNext()) {
+ List<?> e = it.next();
+ Object key = e.get(0);
+ Object val = (hasNewVal ? e.get(valColIdx) : e.get(1));
+
+ Object newVal;
+
+ Map<String, Object> newColVals = new HashMap<>();
+
+ for (int i = 0; i < plan.colNames.length; i++) {
+ if (hasNewVal && i == valColIdx - 2)
+ continue;
+
+ newColVals.put(plan.colNames[i], e.get(i + 2));
+ }
+
+ newVal = plan.valSupplier.apply(e);
+
+ if (bin && !(val instanceof BinaryObject))
+ val = cctx.grid().binary().toBinary(val);
+
+ // Skip key and value - that's why we start off with 2nd column
+ for (int i = 0; i < plan.tbl.getColumns().length - 2; i++) {
+ Column c = plan.tbl.getColumn(i + 2);
+
+ boolean hasNewColVal = newColVals.containsKey(c.getName());
+
+ // Binary objects get old field values from the Builder, so we can skip what we're not updating
+ if (bin && !hasNewColVal)
+ continue;
+
+ Object colVal = hasNewColVal ? newColVals.get(c.getName()) : desc.columnValue(key, val, i);
+
+ desc.setColumnValue(key, newVal, colVal, i);
+ }
+
+ if (bin && hasProps) {
+ assert newVal instanceof BinaryObjectBuilder;
+
+ newVal = ((BinaryObjectBuilder) newVal).build();
+ }
+
+ Object srcVal = e.get(1);
+
+ if (bin && !(srcVal instanceof BinaryObject))
+ srcVal = cctx.grid().binary().toBinary(srcVal);
+
+ rows.put(key, new ModifyingEntryProcessor(srcVal, new EntryValueUpdater(newVal)));
+
+ if ((pageSize > 0 && rows.size() == pageSize) || (!it.hasNext())) {
+ PageProcessingResult pageRes = processPage(cctx, rows);
+
+ res += pageRes.cnt;
+
+ failedKeys.addAll(F.asList(pageRes.errKeys));
+
+ if (pageRes.ex != null) {
+ if (resEx == null)
+ resEx = pageRes.ex;
+ else
+ resEx.setNextException(pageRes.ex);
+ }
+
+ if (it.hasNext())
+ rows.clear(); // No need to clear after the last batch.
+ }
+ }
+
+ if (resEx != null) {
+ if (!F.isEmpty(failedKeys)) {
+ // Don't go for a re-run if processing of some keys yielded exceptions and report keys that
+ // had been modified concurrently right away.
+ String msg = "Failed to UPDATE some keys because they had been modified concurrently " +
+ "[keys=" + failedKeys + ']';
+
+ SQLException dupEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
+
+ dupEx.setNextException(resEx);
+
+ resEx = dupEx;
+ }
+
+ throw new IgniteSQLException(resEx);
+ }
+ }
+ finally {
+ cctx.operationContextPerCall(opCtx);
+ }
+
+ return new UpdateResult(res, failedKeys.toArray());
+ }
+
+ /**
+ * Process errors of entry processor - split the keys into duplicated/concurrently modified and those whose
+ * processing yielded an exception.
+ *
+ * @param res Result of {@link GridCacheAdapter#invokeAll)}
+ * @return pair [array of duplicated/concurrently modified keys, SQL exception for erroneous keys] (exception is
+ * null if all keys are duplicates/concurrently modified ones).
+ */
+ private static PageProcessingErrorResult splitErrors(Map<Object, EntryProcessorResult<Boolean>> res) {
+ Set<Object> errKeys = new LinkedHashSet<>(res.keySet());
+
+ SQLException currSqlEx = null;
+
+ SQLException firstSqlEx = null;
+
+ int errors = 0;
+
+ // Let's form a chain of SQL exceptions
+ for (Map.Entry<Object, EntryProcessorResult<Boolean>> e : res.entrySet()) {
+ try {
+ e.getValue().get();
+ }
+ catch (EntryProcessorException ex) {
+ SQLException next = createJdbcSqlException("Failed to process key '" + e.getKey() + '\'',
+ IgniteQueryErrorCode.ENTRY_PROCESSING);
+
+ next.initCause(ex);
+
+ if (currSqlEx != null)
+ currSqlEx.setNextException(next);
+ else
+ firstSqlEx = next;
+
+ currSqlEx = next;
+
+ errKeys.remove(e.getKey());
+
+ errors++;
+ }
+ }
+
+ return new PageProcessingErrorResult(errKeys.toArray(), firstSqlEx, errors);
+ }
+
+ /**
+ * Execute MERGE statement plan.
+ * @param cursor Cursor to take inserted data from.
+ * @param pageSize Batch size to stream data from {@code cursor}, anything <= 0 for single page operations.
+ * @return Number of items affected.
+ * @throws IgniteCheckedException if failed.
+ */
+ @SuppressWarnings("unchecked")
+ private long doMerge(UpdatePlan plan, QueryCursorImpl<List<?>> cursor, int pageSize) throws IgniteCheckedException {
+ GridH2RowDescriptor desc = plan.tbl.rowDescriptor();
+
+ GridCacheContext cctx = desc.context();
+
+ // If we have just one item to put, just do so
+ if (plan.rowsNum == 1) {
+ IgniteBiTuple t = rowToKeyValue(cctx, cursor.iterator().next().toArray(), plan.colNames, plan.keySupplier,
+ plan.valSupplier, plan.keyColIdx, plan.valColIdx, desc.type());
+
+ cctx.cache().put(t.getKey(), t.getValue());
+ return 1;
+ }
+ else {
+ int resCnt = 0;
+ Map<Object, Object> rows = new LinkedHashMap<>();
+
+ for (Iterator<List<?>> it = cursor.iterator(); it.hasNext();) {
+ List<?> row = it.next();
+
+ IgniteBiTuple t = rowToKeyValue(cctx, row.toArray(), plan.colNames, plan.keySupplier, plan.valSupplier,
+ plan.keyColIdx, plan.valColIdx, desc.type());
+
+ rows.put(t.getKey(), t.getValue());
+
+ if ((pageSize > 0 && rows.size() == pageSize) || !it.hasNext()) {
+ cctx.cache().putAll(rows);
+ resCnt += rows.size();
+
+ if (it.hasNext())
+ rows.clear();
+ }
+ }
+
+ return resCnt;
+ }
+ }
+
+ /**
+ * Execute INSERT statement plan.
+ * @param cursor Cursor to take inserted data from.
+ * @param pageSize Batch size for streaming, anything <= 0 for single page operations.
+ * @return Number of items affected.
+ * @throws IgniteCheckedException if failed, particularly in case of duplicate keys.
+ */
+ @SuppressWarnings({"unchecked", "ConstantConditions"})
+ private long doInsert(UpdatePlan plan, QueryCursorImpl<List<?>> cursor, int pageSize) throws IgniteCheckedException {
+ GridH2RowDescriptor desc = plan.tbl.rowDescriptor();
+
+ GridCacheContext cctx = desc.context();
+
+ // If we have just one item to put, just do so
+ if (plan.rowsNum == 1) {
+ IgniteBiTuple t = rowToKeyValue(cctx, cursor.iterator().next().toArray(), plan.colNames, plan.keySupplier,
+ plan.valSupplier, plan.keyColIdx, plan.valColIdx, desc.type());
+
+ if (cctx.cache().putIfAbsent(t.getKey(), t.getValue()))
+ return 1;
+ else
+ throw new IgniteSQLException("Duplicate key during INSERT [key=" + t.getKey() + ']',
+ IgniteQueryErrorCode.DUPLICATE_KEY);
+ }
+ else {
+ CacheOperationContext opCtx = cctx.operationContextPerCall();
+
+ // Force keepBinary for operation context to avoid binary deserialization inside entry processor
+ if (cctx.binaryMarshaller()) {
+ CacheOperationContext newOpCtx = null;
+
+ if (opCtx == null)
+ // Mimics behavior of GridCacheAdapter#keepBinary and GridCacheProxyImpl#keepBinary
+ newOpCtx = new CacheOperationContext(false, null, true, null, false, null);
+ else if (!opCtx.isKeepBinary())
+ newOpCtx = opCtx.keepBinary();
+
+ if (newOpCtx != null)
+ cctx.operationContextPerCall(newOpCtx);
+ }
+
+ Map<Object, EntryProcessor<Object, Object, Boolean>> rows = plan.isLocSubqry ?
+ new LinkedHashMap<Object, EntryProcessor<Object, Object, Boolean>>(plan.rowsNum) :
+ new LinkedHashMap<Object, EntryProcessor<Object, Object, Boolean>>();
+
+ // Keys that failed to INSERT due to duplication.
+ List<Object> duplicateKeys = new ArrayList<>();
+
+ int resCnt = 0;
+
+ SQLException resEx = null;
+
+ try {
+ Iterator<List<?>> it = cursor.iterator();
+
+ while (it.hasNext()) {
+ List<?> row = it.next();
+
+ final IgniteBiTuple t = rowToKeyValue(cctx, row.toArray(), plan.colNames, plan.keySupplier,
+ plan.valSupplier, plan.keyColIdx, plan.valColIdx, desc.type());
+
+ rows.put(t.getKey(), new InsertEntryProcessor(t.getValue()));
+
+ if (!it.hasNext() || (pageSize > 0 && rows.size() == pageSize)) {
+ PageProcessingResult pageRes = processPage(cctx, rows);
+
+ resCnt += pageRes.cnt;
+
+ duplicateKeys.addAll(F.asList(pageRes.errKeys));
+
+ if (pageRes.ex != null) {
+ if (resEx == null)
+ resEx = pageRes.ex;
+ else
+ resEx.setNextException(pageRes.ex);
+ }
+
+ rows.clear();
+ }
+ }
+
+ if (!F.isEmpty(duplicateKeys)) {
+ String msg = "Failed to INSERT some keys because they are already in cache " +
+ "[keys=" + duplicateKeys + ']';
+
+ SQLException dupEx = new SQLException(msg, null, IgniteQueryErrorCode.DUPLICATE_KEY);
+
+ if (resEx == null)
+ resEx = dupEx;
+ else
+ resEx.setNextException(dupEx);
+ }
+
+ if (resEx != null)
+ throw new IgniteSQLException(resEx);
+
+ return resCnt;
+ }
+ finally {
+ cctx.operationContextPerCall(opCtx);
+ }
+ }
+ }
+
+ /**
+ * Execute given entry processors and collect errors, if any.
+ * @param cctx Cache context.
+ * @param rows Rows to process.
+ * @return Triple [number of rows actually changed; keys that failed to update (duplicates or concurrently
+ * updated ones); chain of exceptions for all keys whose processing resulted in error, or null for no errors].
+ * @throws IgniteCheckedException
+ */
+ @SuppressWarnings({"unchecked", "ConstantConditions"})
+ private static PageProcessingResult processPage(GridCacheContext cctx,
+ Map<Object, EntryProcessor<Object, Object, Boolean>> rows) throws IgniteCheckedException {
+ Map<Object, EntryProcessorResult<Boolean>> res = cctx.cache().invokeAll(rows);
+
+ if (F.isEmpty(res))
+ return new PageProcessingResult(rows.size(), null, null);
+
+ PageProcessingErrorResult splitRes = splitErrors(res);
+
+ int keysCnt = splitRes.errKeys.length;
+
+ return new PageProcessingResult(rows.size() - keysCnt - splitRes.cnt, splitRes.errKeys, splitRes.ex);
+ }
+
+ /**
+ * Convert row presented as an array of Objects into key-value pair to be inserted to cache.
+ *
+ * @param cctx Cache context.
+ * @param row Row to process.
+ * @param cols Query cols.
+ * @param keySupplier Key instantiation method.
+ * @param valSupplier Key instantiation method.
+ * @param keyColIdx Key column index, or {@code -1} if no key column is mentioned in {@code cols}.
+ * @param valColIdx Value column index, or {@code -1} if no value column is mentioned in {@code cols}.
+ * @param desc Table descriptor.
+ * @return Key-value pair.
+ * @throws IgniteCheckedException if failed.
+ */
+ @SuppressWarnings({"unchecked", "ConstantConditions", "ResultOfMethodCallIgnored"})
+ private IgniteBiTuple<?, ?> rowToKeyValue(GridCacheContext cctx, Object[] row, String[] cols,
+ KeyValueSupplier keySupplier, KeyValueSupplier valSupplier, int keyColIdx, int valColIdx,
+ GridQueryTypeDescriptor desc) throws IgniteCheckedException {
+ Object key = keySupplier.apply(F.asList(row));
+ Object val = valSupplier.apply(F.asList(row));
+
+ if (key == null)
+ throw new IgniteSQLException("Key for INSERT or MERGE must not be null", IgniteQueryErrorCode.NULL_KEY);
+
+ if (val == null)
+ throw new IgniteSQLException("Value for INSERT or MERGE must not be null", IgniteQueryErrorCode.NULL_VALUE);
+
+ for (int i = 0; i < cols.length; i++) {
+ if (i == keyColIdx || i == valColIdx)
+ continue;
+
+ desc.setValue(cols[i], key, val, row[i]);
+ }
+
+ if (cctx.binaryMarshaller()) {
+ if (key instanceof BinaryObjectBuilder)
+ key = ((BinaryObjectBuilder) key).build();
+
+ if (val instanceof BinaryObjectBuilder)
+ val = ((BinaryObjectBuilder) val).build();
+
+ if (key instanceof BinaryObject)
+ key = updateHashCodeIfNeeded(cctx, (BinaryObject) key);
+
+ if (val instanceof BinaryObject)
+ val = updateHashCodeIfNeeded(cctx, (BinaryObject) val);
+ }
+
+ return new IgniteBiTuple<>(key, val);
+ }
+
+ /**
+ * Set hash code to binary object if it does not have one.
+ *
+ * @param cctx Cache context.
+ * @param binObj Binary object.
+ * @return Binary object with hash code set.
+ */
+ private BinaryObject updateHashCodeIfNeeded(GridCacheContext cctx, BinaryObject binObj) {
+ if (U.isHashCodeEmpty(binObj)) {
+ if (WARNED_TYPES.add(binObj.type().typeId()))
+ U.warn(indexing.getLogger(), "Binary object's type does not have identity resolver explicitly set, therefore " +
+ "BinaryArrayIdentityResolver is used to generate hash codes for its instances, and therefore " +
+ "hash code of this binary object will most likely not match that of its non serialized form. " +
+ "For finer control over identity of this type, please update your BinaryConfiguration accordingly." +
+ " [typeId=" + binObj.type().typeId() + ", typeName=" + binObj.type().typeName() + ']');
+
+ int hash = BinaryArrayIdentityResolver.instance().hashCode(binObj);
+
+ // Empty hash code means no identity set for the type, therefore, we can safely set hash code
+ // via this Builder as it won't be overwritten.
+ return cctx.grid().binary().builder(binObj)
+ .hashCode(hash)
+ .build();
+ }
+ else
+ return binObj;
+ }
+
+ /** */
+ private final static class InsertEntryProcessor implements EntryProcessor<Object, Object, Boolean> {
+ /** Value to set. */
+ private final Object val;
+
+ /** */
+ private InsertEntryProcessor(Object val) {
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean process(MutableEntry<Object, Object> entry, Object... arguments) throws EntryProcessorException {
+ if (entry.getValue() != null)
+ return false;
+
+ entry.setValue(val);
+ return null; // To leave out only erroneous keys - nulls are skipped on results' processing.
+ }
+ }
+
+ /**
+ * Entry processor invoked by UPDATE and DELETE operations.
+ */
+ private final static class ModifyingEntryProcessor implements EntryProcessor<Object, Object, Boolean> {
+ /** Value to expect. */
+ private final Object val;
+
+ /** Action to perform on entry. */
+ private final IgniteInClosure<MutableEntry<Object, Object>> entryModifier;
+
+ /** */
+ private ModifyingEntryProcessor(Object val, IgniteInClosure<MutableEntry<Object, Object>> entryModifier) {
+ this.val = val;
+ this.entryModifier = entryModifier;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean process(MutableEntry<Object, Object> entry, Object... arguments) throws EntryProcessorException {
+ // Something happened to the cache while we were performing map-reduce.
+ if (!F.eq(entry.getValue(), val))
+ return false;
+
+ entryModifier.apply(entry);
+ return null; // To leave out only erroneous keys - nulls are skipped on results' processing.
+ }
+ }
+
+ /** */
+ private static IgniteInClosure<MutableEntry<Object, Object>> RMV = new IgniteInClosure<MutableEntry<Object, Object>>() {
+ /** {@inheritDoc} */
+ @Override public void apply(MutableEntry<Object, Object> e) {
+ e.remove();
+ }
+ };
+
+ /**
+ *
+ */
+ private static final class EntryValueUpdater implements IgniteInClosure<MutableEntry<Object, Object>> {
+ /** Value to set. */
+ private final Object val;
+
+ /** */
+ private EntryValueUpdater(Object val) {
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void apply(MutableEntry<Object, Object> e) {
+ e.setValue(val);
+ }
+ }
+
+ /**
+ * Wrap result of DML operation (number of items affected) to Iterable suitable to be wrapped by cursor.
+ *
+ * @param itemsCnt Update result to wrap.
+ * @return Resulting Iterable.
+ */
+ @SuppressWarnings("unchecked")
+ private static QueryCursorImpl<List<?>> cursorForUpdateResult(long itemsCnt) {
+ QueryCursorImpl<List<?>> res =
+ new QueryCursorImpl(Collections.singletonList(Collections.singletonList(itemsCnt)), null, false);
+
+ res.fieldsMeta(UPDATE_RESULT_META);
+
+ return res;
+ }
+
+ /** Update result - modifications count and keys to re-run query with, if needed. */
+ private final static class UpdateResult {
+ /** Number of processed items. */
+ final long cnt;
+
+ /** Keys that failed to be UPDATEd or DELETEd due to concurrent modification of values. */
+ @NotNull
+ final Object[] errKeys;
+
+ /** */
+ @SuppressWarnings("ConstantConditions")
+ private UpdateResult(long cnt, Object[] errKeys) {
+ this.cnt = cnt;
+ this.errKeys = U.firstNotNull(errKeys, X.EMPTY_OBJECT_ARRAY);
+ }
+ }
+
+ /** Result of processing an individual page with {@link IgniteCache#invokeAll} including error details, if any. */
+ private final static class PageProcessingResult {
+ /** Number of successfully processed items. */
+ final long cnt;
+
+ /** Keys that failed to be UPDATEd or DELETEd due to concurrent modification of values. */
+ @NotNull
+ final Object[] errKeys;
+
+ /** Chain of exceptions corresponding to failed keys. Null if no keys yielded an exception. */
+ final SQLException ex;
+
+ /** */
+ @SuppressWarnings("ConstantConditions")
+ private PageProcessingResult(long cnt, Object[] errKeys, SQLException ex) {
+ this.cnt = cnt;
+ this.errKeys = U.firstNotNull(errKeys, X.EMPTY_OBJECT_ARRAY);
+ this.ex = ex;
+ }
+ }
+
+ /** Result of splitting keys whose processing resulted into an exception from those skipped by
+ * logic of {@link EntryProcessor}s (most likely INSERT duplicates, or UPDATE/DELETE keys whose values
+ * had been modified concurrently), counting and collecting entry processor exceptions.
+ */
+ private final static class PageProcessingErrorResult {
+ /** Keys that failed to be processed by {@link EntryProcessor} (not due to an exception). */
+ @NotNull
+ final Object[] errKeys;
+
+ /** Number of entries whose processing resulted into an exception. */
+ final int cnt;
+
+ /** Chain of exceptions corresponding to failed keys. Null if no keys yielded an exception. */
+ final SQLException ex;
+
+ /** */
+ @SuppressWarnings("ConstantConditions")
+ private PageProcessingErrorResult(@NotNull Object[] errKeys, SQLException ex, int exCnt) {
+ errKeys = U.firstNotNull(errKeys, X.EMPTY_OBJECT_ARRAY);
+ // When exceptions count must be zero, exceptions chain must be not null, and vice versa.
+ assert exCnt == 0 ^ ex != null;
+
+ this.errKeys = errKeys;
+ this.cnt = exCnt;
+ this.ex = ex;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/GridH2ResultSetIterator.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/GridH2ResultSetIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/GridH2ResultSetIterator.java
index e0680d3..c8c26c4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/GridH2ResultSetIterator.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/GridH2ResultSetIterator.java
@@ -24,6 +24,7 @@ import java.util.NoSuchElementException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -136,7 +137,7 @@ public abstract class GridH2ResultSetIterator<T> extends GridCloseableIteratorAd
return true;
}
catch (SQLException e) {
- throw new IgniteException(e);
+ throw new IgniteSQLException(e);
}
}