You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/11/28 11:52:51 UTC

[20/50] [abbrv] ignite git commit: IGNITE-9937 Primary response error can be lost due to unwrapping a key - Fixes #5078.

IGNITE-9937 Primary response error can be lost due to unwrapping a key - Fixes #5078.

Signed-off-by: Pavel Kovalenko <jo...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7e1d1783
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7e1d1783
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7e1d1783

Branch: refs/heads/ignite-9720
Commit: 7e1d17830429a78ca62e2f007fece7de6466eb0f
Parents: e8eeea3
Author: Roman Guseinov <gr...@gmail.com>
Authored: Mon Nov 26 16:57:47 2018 +0300
Committer: Pavel Kovalenko <jo...@gmail.com>
Committed: Mon Nov 26 16:57:47 2018 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java          |  30 ++++-
 .../GridNearAtomicAbstractUpdateFuture.java     |  43 ++++++-
 .../cache/store/CacheStoreWriteErrorTest.java   | 127 +++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite.java |   2 +
 4 files changed, 196 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7e1d1783/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 86d7b3c..74be8e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -34,6 +34,7 @@ import javax.cache.processor.EntryProcessorResult;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryInvalidTypeException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
@@ -2725,6 +2726,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
             final GridDhtAtomicAbstractUpdateFuture dhtFut = dhtUpdRes.dhtFuture();
 
+            Collection<Object> failedToUnwrapKeys = null;
+
             // Avoid iterator creation.
             for (int i = 0; i < entries.size(); i++) {
                 GridDhtCacheEntry entry = entries.get(i);
@@ -2737,9 +2740,26 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     continue;
                 }
 
-                if (storeErr != null &&
-                    storeErr.failedKeys().contains(entry.key().value(ctx.cacheObjectContext(), false)))
-                    continue;
+                if (storeErr != null) {
+                    Object key = entry.key();
+
+                    try {
+                        key = entry.key().value(ctx.cacheObjectContext(), false);
+                    }
+                    catch (BinaryInvalidTypeException e) {
+                        if (log.isDebugEnabled()) {
+                            if (failedToUnwrapKeys == null)
+                                failedToUnwrapKeys = new ArrayList<>();
+
+                            // To limit keys count in log message.
+                            if (failedToUnwrapKeys.size() < 5)
+                                failedToUnwrapKeys.add(key);
+                        }
+                    }
+
+                    if (storeErr.failedKeys().contains(key))
+                        continue;
+                }
 
                 try {
                     // We are holding java-level locks on entries at this point.
@@ -2868,6 +2888,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 dhtUpdRes.processedEntriesCount(firstEntryIdx + i + 1);
             }
 
+            if (failedToUnwrapKeys != null) {
+                log.warning("Failed to get values of keys: " + failedToUnwrapKeys +
+                    " (the binary objects will be used instead).");
+            }
         }
         catch (IgniteCheckedException e) {
             res.addFailedKeys(putMap != null ? putMap.keySet() : rmvKeys, e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e1d1783/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index f91e3f3..983b094 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryInvalidTypeException;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -399,10 +400,46 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture
 
         Collection<Object> keys = new ArrayList<>(keys0.size());
 
-        for (KeyCacheObject key : keys0)
-            keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
+        Collection<Object> failedToUnwrapKeys = null;
 
-        err.add(keys, res.error(), req.topologyVersion());
+        Exception suppressedErr = null;
+
+        for (KeyCacheObject key : keys0) {
+            try {
+                keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
+            }
+            catch (BinaryInvalidTypeException e) {
+                keys.add(cctx.toCacheKeyObject(key));
+
+                if (log.isDebugEnabled()) {
+                    if (failedToUnwrapKeys == null)
+                        failedToUnwrapKeys = new ArrayList<>();
+
+                    // To limit keys count in log message.
+                    if (failedToUnwrapKeys.size() < 5)
+                        failedToUnwrapKeys.add(key);
+                }
+
+                suppressedErr = e;
+            }
+            catch (Exception e) {
+                keys.add(cctx.toCacheKeyObject(key));
+
+                suppressedErr = e;
+            }
+        }
+
+        if (failedToUnwrapKeys != null) {
+            log.warning("Failed to unwrap keys: " + failedToUnwrapKeys +
+                " (the binary objects will be used instead).");
+        }
+
+        IgniteCheckedException error = res.error();
+
+        if (suppressedErr != null)
+            error.addSuppressed(suppressedErr);
+
+        err.add(keys, error, req.topologyVersion());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e1d1783/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreWriteErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreWriteErrorTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreWriteErrorTest.java
new file mode 100644
index 0000000..fce1f5d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreWriteErrorTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import com.google.common.base.Throwables;
+import java.util.HashMap;
+import java.util.concurrent.Callable;
+import javax.cache.Cache;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * This class tests handling exceptions from {@link CacheStore#write(Cache.Entry)}.
+ */
+public class CacheStoreWriteErrorTest extends GridCommonAbstractTest {
+    /** */
+    public static final String CACHE_NAME = "cache";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        CacheConfiguration cacheCfg = new CacheConfiguration(CACHE_NAME)
+            .setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC)
+            .setCacheStoreFactory(FactoryBuilder.factoryOf(ThrowableCacheStore.class))
+            .setWriteThrough(true)
+            .setStoreKeepBinary(true);
+
+        return super.getConfiguration(gridName)
+            .setCacheConfiguration(cacheCfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * Checks primary error while saving batch with one entry.
+     */
+    public void testPrimaryErrorForBatchSize1() {
+        checkPrimaryError(1);
+    }
+
+    /**
+     * Checks primary error while saving batch with two entries.
+     */
+    public void testPrimaryErrorForBatchSize2() {
+        checkPrimaryError(2);
+    }
+
+    /**
+     * Checks that primary error ({@link CacheWriterException}) is not lost due to unwrapping a key.
+     *
+     * @param batchSize Batch size.
+     */
+    private void checkPrimaryError(int batchSize) {
+        Throwable t = GridTestUtils.assertThrows(log,
+            new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    try (Ignite grid = startGrid()) {
+                        IgniteCache<BinaryObject, String> cache = grid.cache(CACHE_NAME);
+
+                        HashMap<BinaryObject, String> batch = new HashMap<>();
+
+                        for (int i = 0; i < batchSize; i++) {
+                            BinaryObject key = grid.binary().builder("KEY_TYPE_NAME").setField("id", i).build();
+
+                            batch.put(key, "VALUE");
+                        }
+
+                        cache.putAllAsync(batch).get();
+                    }
+
+                    return null;
+                }
+            }, CacheWriterException.class, null);
+
+        assertTrue("Stacktrace should contain the message of the original exception",
+            Throwables.getStackTraceAsString(t).contains(ThrowableCacheStore.EXCEPTION_MESSAGE));
+    }
+
+    /**
+     * {@link CacheStore} implementation which throws {@link RuntimeException} for every write operation.
+     */
+    public static class ThrowableCacheStore extends CacheStoreAdapter<Object, Object> {
+        /** */
+        private static final String EXCEPTION_MESSAGE = "WRITE CACHE STORE EXCEPTION";
+
+        /** {@inheritDoc} */
+        @Override public Object load(Object o) throws CacheLoaderException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<?, ?> entry) throws CacheWriterException {
+            throw new RuntimeException(EXCEPTION_MESSAGE);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object o) throws CacheWriterException {
+            // No-op.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e1d1783/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 5d7b306..52e2ba2 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -22,6 +22,7 @@ import junit.framework.TestSuite;
 import org.apache.ignite.cache.IgniteCacheEntryProcessorSequentialCallTest;
 import org.apache.ignite.cache.IgniteWarmupClosureSelfTest;
 import org.apache.ignite.cache.store.CacheStoreReadFromBackupTest;
+import org.apache.ignite.cache.store.CacheStoreWriteErrorTest;
 import org.apache.ignite.cache.store.CacheTransactionalStoreReadFromBackupTest;
 import org.apache.ignite.cache.store.GridCacheBalancingStoreSelfTest;
 import org.apache.ignite.cache.store.GridCacheLoadOnlyStoreAdapterSelfTest;
@@ -333,6 +334,7 @@ public class IgniteCacheTestSuite extends TestSuite {
 
         suite.addTestSuite(GridStoreLoadCacheTest.class);
         suite.addTestSuite(CacheStoreReadFromBackupTest.class);
+        suite.addTestSuite(CacheStoreWriteErrorTest.class);
         suite.addTestSuite(CacheTransactionalStoreReadFromBackupTest.class);
 
         //suite.addTestSuite(CacheAtomicSingleMessageCountSelfTest.class);