You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/10/13 17:15:05 UTC

[geode] branch develop updated: GEODE-3828: add tests for clients with use of delta

This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new cf5029d  GEODE-3828: add tests for clients with use of delta
cf5029d is described below

commit cf5029dd77b9c937eb06f9b240650a181569bdc7
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Thu Oct 12 14:40:08 2017 -0700

    GEODE-3828: add tests for clients with use of delta
    
    * verifies that caching clients do receive delta updates
    * verifies that empty clients do not receive delta updates
---
 .../geode/internal/cache/InternalRegion.java       |  38 +++
 .../apache/geode/internal/cache/LocalRegion.java   |   2 +-
 .../tier/sockets/ClientProxyWithDeltaTest.java     | 308 +++++++++++++++++++++
 3 files changed, 347 insertions(+), 1 deletion(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
new file mode 100644
index 0000000..a58df4c
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import org.apache.geode.cache.Region;
+
+/**
+ * Interface to be used instead of type-casting to LocalRegion.
+ *
+ * <p>
+ * The following interfaces are implemented by LocalRegion and may need to be extended by
+ * InternalRegion to completely allow code to move to using InternalRegion:
+ * <ul>
+ * <li>RegionAttributes
+ * <li>AttributesMutator
+ * <li>CacheStatistics
+ * <li>DataSerializableFixedID
+ * <li>RegionEntryContext
+ * <li>Extensible
+ * </pre>
+ * </ul>
+ */
+public interface InternalRegion extends Region, HasCachePerfStats {
+
+  CachePerfStats getCachePerfStats();
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index bc384c2..270c4b8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -227,7 +227,7 @@ import org.apache.logging.log4j.Logger;
  * distribution behavior.
  */
 @SuppressWarnings("deprecation")
-public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
+public class LocalRegion extends AbstractRegion implements InternalRegion, LoaderHelperFactory,
     ResourceListener<MemoryEvent>, DiskExceptionHandler, DiskRecoveryStore {
 
   // package-private to avoid synthetic accessor
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyWithDeltaTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyWithDeltaTest.java
new file mode 100644
index 0000000..5edc49e
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyWithDeltaTest.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.internal.AvailablePort.SOCKET;
+import static org.apache.geode.internal.AvailablePort.getRandomAvailablePort;
+import static org.apache.geode.test.dunit.Disconnect.disconnectAllFromDS;
+import static org.apache.geode.test.dunit.Host.getHost;
+import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM;
+import static org.apache.geode.test.dunit.NetworkUtils.getServerHostName;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.Delta;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.Pool;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.client.internal.InternalClientCache;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.internal.cache.CachePerfStats;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+@SuppressWarnings("serial")
+public class ClientProxyWithDeltaTest implements Serializable {
+
+  private static final String PROXY_NAME = "PROXY_NAME";
+  private static final String CACHING_PROXY_NAME = "CACHING_PROXY_NAME";
+
+  private static InternalCache cache;
+  private static InternalClientCache clientCache;
+
+  private String hostName;
+
+  private VM server;
+  private VM client1;
+  private VM client2;
+  private int serverPort;
+
+  @ClassRule
+  public static DistributedTestRule distributedTestRule = new DistributedTestRule();
+
+  @Before
+  public void setUp() throws Exception {
+    server = getHost(0).getVM(0);
+    client1 = getHost(0).getVM(1);
+    client2 = getHost(0).getVM(3);
+
+    hostName = getServerHostName(server.getHost());
+
+    serverPort = server.invoke(() -> createServerCache());
+
+    client1.invoke(() -> createClientCacheWithProxyRegion(hostName, serverPort));
+    client2.invoke(() -> createClientCacheWithProxyRegion(hostName, serverPort));
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    invokeInEveryVM(() -> DeltaEnabledObject.resetFromDeltaInvoked());
+    invokeInEveryVM(() -> CacheClientUpdater.isUsedByTest = false);
+
+    disconnectAllFromDS();
+
+    cache = null;
+    invokeInEveryVM(() -> cache = null);
+
+    clientCache = null;
+    invokeInEveryVM(() -> clientCache = null);
+  }
+
+  /**
+   * Verifies that delta put arrives as delta object to client with CACHING_PROXY region
+   */
+  @Test
+  public void cachingClientReceivesDeltaUpdates() throws Exception {
+    client1.invoke(() -> {
+      clientCache.close();
+      clientCache = null;
+      createClientCacheWithCachingRegion(hostName, serverPort);
+    });
+    client2.invoke(() -> {
+      clientCache.close();
+      clientCache = null;
+      createClientCacheWithCachingRegion(hostName, serverPort);
+    });
+
+    client2.invoke(() -> {
+      CacheClientUpdater.isUsedByTest = true;
+    });
+
+    client1.invoke(() -> {
+      Region<Integer, DeltaEnabledObject> region = clientCache.getRegion(CACHING_PROXY_NAME);
+      DeltaEnabledObject objectWithDelta = new DeltaEnabledObject();
+      for (int i = 1; i <= 3; i++) {
+        objectWithDelta.setValue(i);
+        region.put(1, objectWithDelta);
+      }
+      region.put(0, new DeltaEnabledObject());
+    });
+
+    client2.invoke(() -> {
+      await().atMost(30, SECONDS)
+          .until(() -> clientCache.getRegion(CACHING_PROXY_NAME).containsKey(0));
+      assertThat(CacheClientUpdater.fullValueRequested).isFalse();
+      assertThat(DeltaEnabledObject.fromDeltaInvoked()).isTrue();
+    });
+  }
+
+  /**
+   * Verifies that delta put arrives as complete object to client with PROXY region
+   */
+  @Test
+  public void emptyClientReceivesFullUpdatesInsteadOfDeltaUpdates() throws Exception {
+    client2.invoke(() -> {
+      CacheClientUpdater.isUsedByTest = true;
+      clientCache.<Integer, DeltaEnabledObject>getRegion(PROXY_NAME).getAttributesMutator()
+          .addCacheListener(new ClientListener());
+    });
+
+    client1.invoke(() -> {
+      Region<Integer, DeltaEnabledObject> region = clientCache.getRegion(PROXY_NAME);
+      DeltaEnabledObject objectWithDelta = new DeltaEnabledObject();
+      for (int i = 1; i <= 3; i++) {
+        objectWithDelta.setValue(i);
+        region.put(1, objectWithDelta);
+      }
+      region.put(0, new DeltaEnabledObject());
+    });
+
+    client2.invoke(() -> {
+      await().atMost(30, SECONDS).until(() -> ClientListener.keyZeroCreated.get());
+      assertThat(CacheClientUpdater.fullValueRequested).isFalse();
+      assertThat(DeltaEnabledObject.fromDeltaInvoked()).isFalse();
+    });
+  }
+
+  /**
+   * Verifies that reusing delta object as value does not use delta when putting with new key
+   */
+  @Test
+  public void reusingValueForCreatesDoesNotUseDelta() throws Exception {
+    client1.invoke(() -> {
+      Region<Integer, DeltaEnabledObject> region = clientCache.getRegion(PROXY_NAME);
+      DeltaEnabledObject objectWithDelta = new DeltaEnabledObject();
+      for (int i = 1; i <= 3; i++) {
+        objectWithDelta.setValue(i);
+        region.create(i, objectWithDelta);
+      }
+    });
+
+    server.invoke(() -> {
+      CachePerfStats stats = ((InternalRegion) cache.getRegion(PROXY_NAME)).getCachePerfStats();
+      assertThat(stats.getDeltaFailedUpdates()).isEqualTo(0);
+      assertThat(DeltaEnabledObject.fromDeltaInvoked()).isFalse();
+    });
+  }
+
+  private int createServerCache() throws IOException {
+    cache = (InternalCache) new CacheFactory().create();
+
+    RegionFactory<Integer, DeltaEnabledObject> regionFactory = cache.createRegionFactory();
+    regionFactory.setDataPolicy(DataPolicy.REPLICATE);
+    regionFactory.setScope(Scope.DISTRIBUTED_ACK);
+
+    regionFactory.<Integer, DeltaEnabledObject>create(PROXY_NAME);
+    regionFactory.<Integer, DeltaEnabledObject>create(CACHING_PROXY_NAME);
+
+    CacheServer server = cache.addCacheServer();
+    server.setPort(getRandomAvailablePort(SOCKET));
+    server.start();
+    return server.getPort();
+  }
+
+  private void createClientCacheWithProxyRegion(final String hostName, final int port) {
+    clientCache = (InternalClientCache) new ClientCacheFactory().create();
+    assertThat(clientCache.isClient()).isTrue();
+
+    PoolFactory poolFactory = createPoolFactory();
+    poolFactory.addServer(hostName, port);
+
+    Pool pool = poolFactory.create(getClass().getSimpleName() + "-Pool");
+
+    Region region = createRegionOnClient(PROXY_NAME, ClientRegionShortcut.PROXY, pool);
+
+    region.registerInterest("ALL_KEYS");
+    assertThat(region.getAttributes().getCloningEnabled()).isFalse();
+  }
+
+  private void createClientCacheWithCachingRegion(final String hostName, final int port) {
+    clientCache = (InternalClientCache) new ClientCacheFactory().create();
+    assertThat(clientCache.isClient()).isTrue();
+
+    PoolFactory poolFactory = createPoolFactory();
+    poolFactory.addServer(hostName, port);
+
+    Pool pool = poolFactory.create(getClass().getSimpleName() + "-Pool");
+
+    Region region =
+        createRegionOnClient(CACHING_PROXY_NAME, ClientRegionShortcut.CACHING_PROXY, pool);
+
+    region.registerInterest("ALL_KEYS");
+    assertThat(region.getAttributes().getCloningEnabled()).isFalse();
+  }
+
+  private PoolFactory createPoolFactory() {
+    return PoolManager.createFactory().setThreadLocalConnections(true).setMinConnections(3)
+        .setSubscriptionEnabled(true).setSubscriptionRedundancy(0).setReadTimeout(10000)
+        .setSocketBufferSize(32768);
+  }
+
+  private Region<Integer, DeltaEnabledObject> createRegionOnClient(final String regionName,
+      final ClientRegionShortcut shortcut, final Pool pool) {
+    ClientRegionFactory<Integer, DeltaEnabledObject> regionFactory =
+        clientCache.createClientRegionFactory(shortcut);
+    regionFactory.setPoolName(pool.getName());
+    Region<Integer, DeltaEnabledObject> region = regionFactory.create(regionName);
+    assertThat(region.getAttributes().getCloningEnabled()).isFalse();
+    return region;
+  }
+
+  private static class ClientListener extends CacheListenerAdapter<Integer, DeltaEnabledObject> {
+
+    static AtomicBoolean keyZeroCreated = new AtomicBoolean(false);
+
+    @Override
+    public void afterCreate(EntryEvent<Integer, DeltaEnabledObject> event) {
+      keyZeroCreated.set(true);
+    }
+  }
+
+  /**
+   * Object that implements {@code Delta} for use in {@code Cache}.
+   */
+  private static class DeltaEnabledObject implements Delta, Serializable {
+
+    private static final AtomicBoolean fromDeltaInvoked = new AtomicBoolean();
+
+    private int value = 0;
+
+    public void setValue(int value) {
+      this.value = value;
+    }
+
+    @Override
+    public void fromDelta(DataInput in) throws IOException {
+      fromDeltaInvoked.set(true);
+      this.value = DataSerializer.readPrimitiveInt(in);
+    }
+
+    @Override
+    public boolean hasDelta() {
+      return true;
+    }
+
+    @Override
+    public void toDelta(DataOutput out) throws IOException {
+      DataSerializer.writePrimitiveInt(this.value, out);
+    }
+
+    static void resetFromDeltaInvoked() {
+      fromDeltaInvoked.set(false);
+    }
+
+    static boolean fromDeltaInvoked() {
+      return fromDeltaInvoked.get();
+    }
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].