You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/11/28 11:52:51 UTC
[20/50] [abbrv] ignite git commit: IGNITE-9937 Primary response error
can be lost due to unwrapping a key - Fixes #5078.
IGNITE-9937 Primary response error can be lost due to unwrapping a key - Fixes #5078.
Signed-off-by: Pavel Kovalenko <jo...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7e1d1783
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7e1d1783
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7e1d1783
Branch: refs/heads/ignite-9720
Commit: 7e1d17830429a78ca62e2f007fece7de6466eb0f
Parents: e8eeea3
Author: Roman Guseinov <gr...@gmail.com>
Authored: Mon Nov 26 16:57:47 2018 +0300
Committer: Pavel Kovalenko <jo...@gmail.com>
Committed: Mon Nov 26 16:57:47 2018 +0300
----------------------------------------------------------------------
.../dht/atomic/GridDhtAtomicCache.java | 30 ++++-
.../GridNearAtomicAbstractUpdateFuture.java | 43 ++++++-
.../cache/store/CacheStoreWriteErrorTest.java | 127 +++++++++++++++++++
.../ignite/testsuites/IgniteCacheTestSuite.java | 2 +
4 files changed, 196 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e1d1783/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 86d7b3c..74be8e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -34,6 +34,7 @@ import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryInvalidTypeException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
@@ -2725,6 +2726,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final GridDhtAtomicAbstractUpdateFuture dhtFut = dhtUpdRes.dhtFuture();
+ Collection<Object> failedToUnwrapKeys = null;
+
// Avoid iterator creation.
for (int i = 0; i < entries.size(); i++) {
GridDhtCacheEntry entry = entries.get(i);
@@ -2737,9 +2740,26 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
continue;
}
- if (storeErr != null &&
- storeErr.failedKeys().contains(entry.key().value(ctx.cacheObjectContext(), false)))
- continue;
+ if (storeErr != null) {
+ Object key = entry.key();
+
+ try {
+ key = entry.key().value(ctx.cacheObjectContext(), false);
+ }
+ catch (BinaryInvalidTypeException e) {
+ if (log.isDebugEnabled()) {
+ if (failedToUnwrapKeys == null)
+ failedToUnwrapKeys = new ArrayList<>();
+
+ // To limit keys count in log message.
+ if (failedToUnwrapKeys.size() < 5)
+ failedToUnwrapKeys.add(key);
+ }
+ }
+
+ if (storeErr.failedKeys().contains(key))
+ continue;
+ }
try {
// We are holding java-level locks on entries at this point.
@@ -2868,6 +2888,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
dhtUpdRes.processedEntriesCount(firstEntryIdx + i + 1);
}
+ if (failedToUnwrapKeys != null) {
+ log.warning("Failed to get values of keys: " + failedToUnwrapKeys +
+ " (the binary objects will be used instead).");
+ }
}
catch (IgniteCheckedException e) {
res.addFailedKeys(putMap != null ? putMap.keySet() : rmvKeys, e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e1d1783/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index f91e3f3..983b094 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryInvalidTypeException;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -399,10 +400,46 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture
Collection<Object> keys = new ArrayList<>(keys0.size());
- for (KeyCacheObject key : keys0)
- keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
+ Collection<Object> failedToUnwrapKeys = null;
- err.add(keys, res.error(), req.topologyVersion());
+ Exception suppressedErr = null;
+
+ for (KeyCacheObject key : keys0) {
+ try {
+ keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
+ }
+ catch (BinaryInvalidTypeException e) {
+ keys.add(cctx.toCacheKeyObject(key));
+
+ if (log.isDebugEnabled()) {
+ if (failedToUnwrapKeys == null)
+ failedToUnwrapKeys = new ArrayList<>();
+
+ // To limit keys count in log message.
+ if (failedToUnwrapKeys.size() < 5)
+ failedToUnwrapKeys.add(key);
+ }
+
+ suppressedErr = e;
+ }
+ catch (Exception e) {
+ keys.add(cctx.toCacheKeyObject(key));
+
+ suppressedErr = e;
+ }
+ }
+
+ if (failedToUnwrapKeys != null) {
+ log.warning("Failed to unwrap keys: " + failedToUnwrapKeys +
+ " (the binary objects will be used instead).");
+ }
+
+ IgniteCheckedException error = res.error();
+
+ if (suppressedErr != null)
+ error.addSuppressed(suppressedErr);
+
+ err.add(keys, error, req.topologyVersion());
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e1d1783/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreWriteErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreWriteErrorTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreWriteErrorTest.java
new file mode 100644
index 0000000..fce1f5d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreWriteErrorTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import com.google.common.base.Throwables;
+import java.util.HashMap;
+import java.util.concurrent.Callable;
+import javax.cache.Cache;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * This class tests handling exceptions from {@link CacheStore#write(Cache.Entry)}.
+ */
+public class CacheStoreWriteErrorTest extends GridCommonAbstractTest {
+ /** */
+ public static final String CACHE_NAME = "cache";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ CacheConfiguration cacheCfg = new CacheConfiguration(CACHE_NAME)
+ .setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC)
+ .setCacheStoreFactory(FactoryBuilder.factoryOf(ThrowableCacheStore.class))
+ .setWriteThrough(true)
+ .setStoreKeepBinary(true);
+
+ return super.getConfiguration(gridName)
+ .setCacheConfiguration(cacheCfg);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * Checks primary error while saving batch with one entry.
+ */
+ public void testPrimaryErrorForBatchSize1() {
+ checkPrimaryError(1);
+ }
+
+ /**
+ * Checks primary error while saving batch with two entries.
+ */
+ public void testPrimaryErrorForBatchSize2() {
+ checkPrimaryError(2);
+ }
+
+ /**
+ * Checks that primary error ({@link CacheWriterException}) is not lost due to unwrapping a key.
+ *
+ * @param batchSize Batch size.
+ */
+ private void checkPrimaryError(int batchSize) {
+ Throwable t = GridTestUtils.assertThrows(log,
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try (Ignite grid = startGrid()) {
+ IgniteCache<BinaryObject, String> cache = grid.cache(CACHE_NAME);
+
+ HashMap<BinaryObject, String> batch = new HashMap<>();
+
+ for (int i = 0; i < batchSize; i++) {
+ BinaryObject key = grid.binary().builder("KEY_TYPE_NAME").setField("id", i).build();
+
+ batch.put(key, "VALUE");
+ }
+
+ cache.putAllAsync(batch).get();
+ }
+
+ return null;
+ }
+ }, CacheWriterException.class, null);
+
+ assertTrue("Stacktrace should contain the message of the original exception",
+ Throwables.getStackTraceAsString(t).contains(ThrowableCacheStore.EXCEPTION_MESSAGE));
+ }
+
+ /**
+ * {@link CacheStore} implementation which throws {@link RuntimeException} for every write operation.
+ */
+ public static class ThrowableCacheStore extends CacheStoreAdapter<Object, Object> {
+ /** */
+ private static final String EXCEPTION_MESSAGE = "WRITE CACHE STORE EXCEPTION";
+
+ /** {@inheritDoc} */
+ @Override public Object load(Object o) throws CacheLoaderException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<?, ?> entry) throws CacheWriterException {
+ throw new RuntimeException(EXCEPTION_MESSAGE);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object o) throws CacheWriterException {
+ // No-op.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e1d1783/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 5d7b306..52e2ba2 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -22,6 +22,7 @@ import junit.framework.TestSuite;
import org.apache.ignite.cache.IgniteCacheEntryProcessorSequentialCallTest;
import org.apache.ignite.cache.IgniteWarmupClosureSelfTest;
import org.apache.ignite.cache.store.CacheStoreReadFromBackupTest;
+import org.apache.ignite.cache.store.CacheStoreWriteErrorTest;
import org.apache.ignite.cache.store.CacheTransactionalStoreReadFromBackupTest;
import org.apache.ignite.cache.store.GridCacheBalancingStoreSelfTest;
import org.apache.ignite.cache.store.GridCacheLoadOnlyStoreAdapterSelfTest;
@@ -333,6 +334,7 @@ public class IgniteCacheTestSuite extends TestSuite {
suite.addTestSuite(GridStoreLoadCacheTest.class);
suite.addTestSuite(CacheStoreReadFromBackupTest.class);
+ suite.addTestSuite(CacheStoreWriteErrorTest.class);
suite.addTestSuite(CacheTransactionalStoreReadFromBackupTest.class);
//suite.addTestSuite(CacheAtomicSingleMessageCountSelfTest.class);