You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2021/11/13 22:39:14 UTC
[geode] branch wip/corrupt-client updated: Add ExportLocalDataFunction and run it from the DUnit test
This is an automated email from the ASF dual-hosted git repository.
jensdeppe pushed a commit to branch wip/corrupt-client
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/wip/corrupt-client by this push:
new bc07734 Add ExportLocalDataFunction and run it from the DUnit test
bc07734 is described below
commit bc07734bced6fe15432afdfa9a1763301f18a022
Author: Jens Deppe <jd...@vmware.com>
AuthorDate: Sat Nov 13 14:37:54 2021 -0800
Add ExportLocalDataFunction and run it from the DUnit test
---
.../org/apache/geode/ExportLocalDataFunction.java | 208 +++++++++++++++++++++
.../geode/pdx/CorruptPdxClientServerDUnitTest.java | 22 ++-
2 files changed, 226 insertions(+), 4 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/ExportLocalDataFunction.java b/geode-core/src/distributedTest/java/org/apache/geode/ExportLocalDataFunction.java
new file mode 100644
index 0000000..9a22ccc
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/ExportLocalDataFunction.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode;
+
+import static java.lang.Thread.currentThread;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.LogWriter;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.Declarable;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.RegionFunctionContext;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.cache.snapshot.RegionSnapshotService;
+import org.apache.geode.cache.snapshot.SnapshotFilter;
+import org.apache.geode.cache.snapshot.SnapshotOptions;
+import org.apache.geode.cache.snapshot.SnapshotOptions.SnapshotFormat;
+import org.apache.geode.internal.cache.CachedDeserializable;
+import org.apache.geode.internal.cache.EntrySnapshot;
+import org.apache.geode.internal.cache.RegionEntry;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+
+public class ExportLocalDataFunction implements Function<String>, Declarable {
+
+ private static final Logger logger = LogService.getLogger();
+ private static final byte DSCODE_DATA_SERIALIZABLE = 45;
+ private static final byte DSCODE_SERIALIZABLE = 44;
+ private static final byte DSCODE_PDX = 93;
+
+ public ExportLocalDataFunction() {
+ }
+
+ public void execute(final FunctionContext<String> context) {
+ // Get directory name
+
+ logger.info("DEBUG: Executing ExportLocalDataFunction on {}", context.getMemberName());
+
+ final String directoryName = context.getArguments();
+ final Cache cache = context.getCache();
+ final String memberName = cache.getName();
+ final LogWriter logger = cache.getLogger();
+
+ // Get local data set
+ final RegionFunctionContext rfc = (RegionFunctionContext) context;
+ final Region<Object, Object> localData = PartitionRegionHelper.getLocalDataForContext(rfc);
+
+ // Create the file
+ final String fileName =
+ "server_" + memberName + "_region_" + localData.getName() + "_snapshot.gfd";
+ final File file = new File(directoryName, fileName);
+
+ // Export local data set
+ final RegionSnapshotService<Object, Object> service = localData.getSnapshotService();
+ try {
+ logger.warning(
+ currentThread().getName() + ": Exporting " + localData.size() + " entries in region "
+ + localData.getName() + " to file " + file.getAbsolutePath() + " started");
+ final SnapshotOptions<Object, Object> options = service.createOptions();
+ options.setFilter(getRejectingFilter(localData, logger));
+ // options.setFilter(getReserializingFilter(localData, logger));
+ service.save(file, SnapshotFormat.GEMFIRE, options);
+ logger.warning(
+ currentThread().getName() + ": Exporting " + localData.size() + " entries in region "
+ + localData.getName() + " to file " + file.getAbsolutePath() + " completed");
+ } catch (Exception e) {
+ context.getResultSender().sendException(e);
+ return;
+ }
+
+ context.getResultSender().lastResult(true);
+ }
+
+ private <K, V> SnapshotFilter<K, V> getRejectingFilter(final Region<K, V> localData, final LogWriter logger) {
+ return entry -> {
+ boolean accept = true;
+ try {
+ //noinspection ResultOfMethodCallIgnored
+ entry.getValue();
+ } catch (Exception e) {
+ final byte[] valueBytes = getValueBytes(entry);
+ logger.warning("Caught the following exception attempting to deserialize value region="
+ + localData.getName() + "; key=" + entry.getKey() + "; valueLength="
+ + valueBytes.length
+ + "; value=" + Arrays.toString(valueBytes) + ":", e);
+ accept = false;
+ }
+ return accept;
+ };
+ }
+
+ private <K, V> SnapshotFilter<K, V> getReserializingFilter(final Region<K, V> localData,
+ final LogWriter logger) {
+ return new SnapshotFilter<K, V>() {
+ public boolean accept(Map.Entry<K, V> entry) {
+ boolean accept = true;
+ try {
+ //noinspection ResultOfMethodCallIgnored
+ entry.getValue();
+ } catch (Exception e) {
+ final byte[] valueBytes = getValueBytes(entry);
+ logger.warning("Caught the following exception attempting to deserialize value region="
+ + localData.getName() + "; key=" + entry.getKey() + "; valueLength="
+ + valueBytes.length
+ + "; value=" + Arrays.toString(valueBytes) + ":", e);
+ logger.warning(
+ "Attempting to deserialize as DataSerializable value region=" + localData.getName()
+ + "; key=" + entry.getKey());
+ accept =
+ attemptToDeserialize(entry, valueBytes, logger, "DataSerializable",
+ DSCODE_DATA_SERIALIZABLE);
+ if (!accept) {
+ logger.warning(
+ "Attempting to deserialize as Serializable value region=" + localData.getName()
+ + "; key="
+ + entry.getKey());
+ accept =
+ attemptToDeserialize(entry, valueBytes, logger, "Serializable",
+ DSCODE_SERIALIZABLE);
+ if (!accept) {
+ logger.warning(
+ "Attempting to deserialize as PDX value region=" + localData.getName()
+ + "; key="
+ + entry.getKey());
+ accept = attemptToDeserialize(entry, valueBytes, logger, "PDX", DSCODE_PDX);
+ }
+ }
+ }
+ return accept;
+ }
+
+ private boolean attemptToDeserialize(final Map.Entry<K, V> entry, final byte[] valueBytes,
+ final LogWriter logger,
+ final String type, final byte b) {
+ boolean accept = true;
+ valueBytes[0] = b;
+ try {
+ Object value = entry.getValue();
+ logger.warning("Accepting entry since the value was successfully deserialized as " + type
+ + " region=" + localData.getName() + "; key=" + entry.getKey() + "; value="
+ + value);
+ } catch (Throwable e2) {
+ logger.warning(
+ "Rejecting entry since the value failed to deserialize as " + type + " region="
+ + localData.getName() + "; key=" + entry.getKey());
+ accept = false;
+ }
+ return accept;
+ }
+ };
+ }
+
+ private static <K, V> byte[] getValueBytes(final Map.Entry<K, V> entry) {
+ byte[] valueBytes = null;
+ if (entry instanceof EntrySnapshot) {
+ EntrySnapshot es = (EntrySnapshot) entry;
+ RegionEntry re = es.getRegionEntry();
+ Object valueInVm = re.getValueInVM(null);
+ if (valueInVm instanceof CachedDeserializable) {
+ Object cdValue = ((CachedDeserializable) valueInVm).getValue();
+ if (cdValue instanceof byte[]) {
+ valueBytes = (byte[]) cdValue;
+ }
+ }
+ }
+ return valueBytes;
+ }
+
+ public String getId() {
+ return getClass().getSimpleName();
+ }
+
+ public boolean optimizeForWrite() {
+ return true;
+ }
+
+ public boolean hasResult() {
+ return true;
+ }
+
+ public boolean isHA() {
+ return false;
+ }
+
+ public void init(Properties properties) {
+ }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/pdx/CorruptPdxClientServerDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/pdx/CorruptPdxClientServerDUnitTest.java
index 550c90d..0c90eb1 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/pdx/CorruptPdxClientServerDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/pdx/CorruptPdxClientServerDUnitTest.java
@@ -14,6 +14,8 @@
*/
package org.apache.geode.pdx;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER;
import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache;
import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCache;
import static org.assertj.core.api.Assertions.assertThat;
@@ -21,16 +23,20 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.io.IOError;
import java.io.IOException;
+import java.util.Properties;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.geode.ExportLocalDataFunction;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.pdx.internal.PdxWriterImpl;
import org.apache.geode.test.dunit.SerializableRunnableIF;
import org.apache.geode.test.dunit.rules.ClientVM;
@@ -53,8 +59,13 @@ public class CorruptPdxClientServerDUnitTest {
public void testSimplePut() throws Exception {
final MemberVM locator = cluster.startLocatorVM(0);
final int locatorPort = locator.getPort();
- final MemberVM server1 = cluster.startServerVM(1, locatorPort);
- final MemberVM server2 = cluster.startServerVM(2, locatorPort);
+
+ Properties properties = new Properties();
+ properties.put(SERIALIZABLE_OBJECT_FILTER, "org.apache.geode.**");
+ properties.put(LOCATORS, String.format("localhost[%d]", locatorPort));
+
+ final MemberVM server1 = cluster.startServerVM(1, properties);
+ final MemberVM server2 = cluster.startServerVM(2, properties);
final ClientVM client = cluster.startClientVM(3,
cf -> cf.withLocatorConnection(locatorPort));
@@ -93,8 +104,11 @@ public class CorruptPdxClientServerDUnitTest {
client.invoke(() -> {
final Region<Object, Object> region = getClientCache().getRegion(REGION_NAME);
- region.remove(3);
- region.destroy(4);
+ ResultCollector resultCollector = FunctionService.onRegion(region).execute(new ExportLocalDataFunction());
+ resultCollector.getResult();
+
+// region.remove(3);
+// region.destroy(4);
});
}