You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2021/11/13 22:39:14 UTC

[geode] branch wip/corrupt-client updated: Add ExportLocalDataFunction and run it from the DUnit test

This is an automated email from the ASF dual-hosted git repository.

jensdeppe pushed a commit to branch wip/corrupt-client
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/wip/corrupt-client by this push:
     new bc07734  Add ExportLocalDataFunction and run it from the DUnit test
bc07734 is described below

commit bc07734bced6fe15432afdfa9a1763301f18a022
Author: Jens Deppe <jd...@vmware.com>
AuthorDate: Sat Nov 13 14:37:54 2021 -0800

    Add ExportLocalDataFunction and run it from the DUnit test
---
 .../org/apache/geode/ExportLocalDataFunction.java  | 208 +++++++++++++++++++++
 .../geode/pdx/CorruptPdxClientServerDUnitTest.java |  22 ++-
 2 files changed, 226 insertions(+), 4 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/ExportLocalDataFunction.java b/geode-core/src/distributedTest/java/org/apache/geode/ExportLocalDataFunction.java
new file mode 100644
index 0000000..9a22ccc
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/ExportLocalDataFunction.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode;
+
+import static java.lang.Thread.currentThread;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.LogWriter;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.Declarable;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.RegionFunctionContext;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.cache.snapshot.RegionSnapshotService;
+import org.apache.geode.cache.snapshot.SnapshotFilter;
+import org.apache.geode.cache.snapshot.SnapshotOptions;
+import org.apache.geode.cache.snapshot.SnapshotOptions.SnapshotFormat;
+import org.apache.geode.internal.cache.CachedDeserializable;
+import org.apache.geode.internal.cache.EntrySnapshot;
+import org.apache.geode.internal.cache.RegionEntry;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+
+public class ExportLocalDataFunction implements Function<String>, Declarable {
+
+  private static final Logger logger = LogService.getLogger();
+  private static final byte DSCODE_DATA_SERIALIZABLE = 45;
+  private static final byte DSCODE_SERIALIZABLE = 44;
+  private static final byte DSCODE_PDX = 93;
+
+  public ExportLocalDataFunction() {
+  }
+
+  public void execute(final FunctionContext<String> context) {
+    // Get directory name
+
+    logger.info("DEBUG: Executing ExportLocalDataFunction on {}", context.getMemberName());
+
+    final String directoryName = context.getArguments();
+    final Cache cache = context.getCache();
+    final String memberName = cache.getName();
+    final LogWriter logger = cache.getLogger();
+
+    // Get local data set
+    final RegionFunctionContext rfc = (RegionFunctionContext) context;
+    final Region<Object, Object> localData = PartitionRegionHelper.getLocalDataForContext(rfc);
+
+    // Create the file
+    final String fileName =
+        "server_" + memberName + "_region_" + localData.getName() + "_snapshot.gfd";
+    final File file = new File(directoryName, fileName);
+
+    // Export local data set
+    final RegionSnapshotService<Object, Object> service = localData.getSnapshotService();
+    try {
+      logger.warning(
+          currentThread().getName() + ": Exporting " + localData.size() + " entries in region "
+              + localData.getName() + " to file " + file.getAbsolutePath() + " started");
+      final SnapshotOptions<Object, Object> options = service.createOptions();
+      options.setFilter(getRejectingFilter(localData, logger));
+      //      options.setFilter(getReserializingFilter(localData, logger));
+      service.save(file, SnapshotFormat.GEMFIRE, options);
+      logger.warning(
+          currentThread().getName() + ": Exporting " + localData.size() + " entries in region "
+              + localData.getName() + " to file " + file.getAbsolutePath() + " completed");
+    } catch (Exception e) {
+      context.getResultSender().sendException(e);
+      return;
+    }
+
+    context.getResultSender().lastResult(true);
+  }
+
+  private <K, V> SnapshotFilter<K, V> getRejectingFilter(final Region<K, V> localData, final LogWriter logger) {
+    return entry -> {
+      boolean accept = true;
+      try {
+        //noinspection ResultOfMethodCallIgnored
+        entry.getValue();
+      } catch (Exception e) {
+        final byte[] valueBytes = getValueBytes(entry);
+        logger.warning("Caught the following exception attempting to deserialize value region="
+            + localData.getName() + "; key=" + entry.getKey() + "; valueLength="
+            + valueBytes.length
+            + "; value=" + Arrays.toString(valueBytes) + ":", e);
+        accept = false;
+      }
+      return accept;
+    };
+  }
+
+  private <K, V> SnapshotFilter<K, V> getReserializingFilter(final Region<K, V> localData,
+                                                             final LogWriter logger) {
+    return new SnapshotFilter<K, V>() {
+      public boolean accept(Map.Entry<K, V> entry) {
+        boolean accept = true;
+        try {
+          //noinspection ResultOfMethodCallIgnored
+          entry.getValue();
+        } catch (Exception e) {
+          final byte[] valueBytes = getValueBytes(entry);
+          logger.warning("Caught the following exception attempting to deserialize value region="
+              + localData.getName() + "; key=" + entry.getKey() + "; valueLength="
+              + valueBytes.length
+              + "; value=" + Arrays.toString(valueBytes) + ":", e);
+          logger.warning(
+              "Attempting to deserialize as DataSerializable value region=" + localData.getName()
+                  + "; key=" + entry.getKey());
+          accept =
+              attemptToDeserialize(entry, valueBytes, logger, "DataSerializable",
+                  DSCODE_DATA_SERIALIZABLE);
+          if (!accept) {
+            logger.warning(
+                "Attempting to deserialize as Serializable value region=" + localData.getName()
+                    + "; key="
+                    + entry.getKey());
+            accept =
+                attemptToDeserialize(entry, valueBytes, logger, "Serializable",
+                    DSCODE_SERIALIZABLE);
+            if (!accept) {
+              logger.warning(
+                  "Attempting to deserialize as PDX value region=" + localData.getName()
+                      + "; key="
+                      + entry.getKey());
+              accept = attemptToDeserialize(entry, valueBytes, logger, "PDX", DSCODE_PDX);
+            }
+          }
+        }
+        return accept;
+      }
+
+      private boolean attemptToDeserialize(final Map.Entry<K, V> entry, final byte[] valueBytes,
+                                           final LogWriter logger,
+                                           final String type, final byte b) {
+        boolean accept = true;
+        valueBytes[0] = b;
+        try {
+          Object value = entry.getValue();
+          logger.warning("Accepting entry since the value was successfully deserialized as " + type
+              + " region=" + localData.getName() + "; key=" + entry.getKey() + "; value="
+              + value);
+        } catch (Throwable e2) {
+          logger.warning(
+              "Rejecting entry since the value failed to deserialize as " + type + " region="
+                  + localData.getName() + "; key=" + entry.getKey());
+          accept = false;
+        }
+        return accept;
+      }
+    };
+  }
+
+  private static <K, V> byte[] getValueBytes(final Map.Entry<K, V> entry) {
+    byte[] valueBytes = null;
+    if (entry instanceof EntrySnapshot) {
+      EntrySnapshot es = (EntrySnapshot) entry;
+      RegionEntry re = es.getRegionEntry();
+      Object valueInVm = re.getValueInVM(null);
+      if (valueInVm instanceof CachedDeserializable) {
+        Object cdValue = ((CachedDeserializable) valueInVm).getValue();
+        if (cdValue instanceof byte[]) {
+          valueBytes = (byte[]) cdValue;
+        }
+      }
+    }
+    return valueBytes;
+  }
+
+  public String getId() {
+    return getClass().getSimpleName();
+  }
+
+  public boolean optimizeForWrite() {
+    return true;
+  }
+
+  public boolean hasResult() {
+    return true;
+  }
+
+  public boolean isHA() {
+    return false;
+  }
+
+  public void init(Properties properties) {
+  }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/pdx/CorruptPdxClientServerDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/pdx/CorruptPdxClientServerDUnitTest.java
index 550c90d..0c90eb1 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/pdx/CorruptPdxClientServerDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/pdx/CorruptPdxClientServerDUnitTest.java
@@ -14,6 +14,8 @@
  */
 package org.apache.geode.pdx;
 
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER;
 import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache;
 import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCache;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -21,16 +23,20 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.io.IOError;
 import java.io.IOException;
+import java.util.Properties;
 
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.geode.ExportLocalDataFunction;
 import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.client.ClientCache;
 import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.cache.execute.ResultCollector;
 import org.apache.geode.pdx.internal.PdxWriterImpl;
 import org.apache.geode.test.dunit.SerializableRunnableIF;
 import org.apache.geode.test.dunit.rules.ClientVM;
@@ -53,8 +59,13 @@ public class CorruptPdxClientServerDUnitTest {
   public void testSimplePut() throws Exception {
     final MemberVM locator = cluster.startLocatorVM(0);
     final int locatorPort = locator.getPort();
-    final MemberVM server1 = cluster.startServerVM(1, locatorPort);
-    final MemberVM server2 = cluster.startServerVM(2, locatorPort);
+
+    Properties properties = new Properties();
+    properties.put(SERIALIZABLE_OBJECT_FILTER, "org.apache.geode.**");
+    properties.put(LOCATORS, String.format("localhost[%d]", locatorPort));
+
+    final MemberVM server1 = cluster.startServerVM(1, properties);
+    final MemberVM server2 = cluster.startServerVM(2, properties);
     final ClientVM client = cluster.startClientVM(3,
         cf -> cf.withLocatorConnection(locatorPort));
 
@@ -93,8 +104,11 @@ public class CorruptPdxClientServerDUnitTest {
 
     client.invoke(() -> {
       final Region<Object, Object> region = getClientCache().getRegion(REGION_NAME);
-      region.remove(3);
-      region.destroy(4);
+      ResultCollector resultCollector = FunctionService.onRegion(region).execute(new ExportLocalDataFunction());
+      resultCollector.getResult();
+
+//      region.remove(3);
+//      region.destroy(4);
     });
 
   }