You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/04/13 17:18:43 UTC
[22/22] geode git commit: Create ClientCachePutBench
Create ClientCachePutBench
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/407afd93
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/407afd93
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/407afd93
Branch: refs/heads/feature/GEODE-2632
Commit: 407afd93f3f0ca2d0e6a058fbae4aa66654fdeab
Parents: 796c15e
Author: Kirk Lund <kl...@apache.org>
Authored: Mon Apr 3 14:52:28 2017 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Thu Apr 13 09:40:47 2017 -0700
----------------------------------------------------------------------
geode-core/build.gradle | 5 +
.../sockets/command/ClientCachePutBench.java | 184 +++++
.../cache/tier/sockets/command/Put65Bench.java | 120 ++++
.../command/ClientCachePutBench-server.xml | 29 +
.../internal/ClusterConfigurationService.java | 20 +-
.../geode/internal/cache/CacheServerImpl.java | 2 +-
.../geode/internal/cache/tier/Acceptor.java | 2 +-
.../cache/tier/sockets/AcceptorImpl.java | 42 +-
.../cache/tier/sockets/BaseCommand.java | 35 +-
.../cache/tier/sockets/CacheClientNotifier.java | 583 +++-------------
.../cache/tier/sockets/CacheClientProxy.java | 698 ++++++++-----------
.../cache/tier/sockets/ClientHealthMonitor.java | 15 +-
.../tier/sockets/ClientProxyMembershipID.java | 5 +-
.../tier/sockets/ClientUpdateMessageImpl.java | 4 +-
.../internal/cache/tier/sockets/HandShake.java | 12 +-
.../geode/internal/logging/LogService.java | 10 +
.../geode/distributed/ServerLauncherUtils.java | 30 +
.../tier/sockets/AcceptorImplJUnitTest.java | 22 +-
.../cache/tier/sockets/AcceptorImplTest.java | 96 +++
.../cache/tier/sockets/CacheServerUtils.java | 55 ++
.../tier/sockets/ClientConflationDUnitTest.java | 2 +-
.../ClientServerForceInvalidateDUnitTest.java | 4 +-
.../tier/sockets/ClientServerMiscDUnitTest.java | 9 +-
.../cache/tier/sockets/ConflationDUnitTest.java | 4 +-
.../cache/tier/sockets/HAInterestTestCase.java | 26 +-
.../sockets/HAStartupAndFailoverDUnitTest.java | 4 +-
.../sockets/InterestListRecoveryDUnitTest.java | 4 +-
.../tier/sockets/RedundancyLevelTestBase.java | 17 +-
.../command/ExperimentIntegrationTest.java | 80 +++
.../tier/sockets/command/ExperimentTest.java | 121 ++++
.../tier/sockets/command/Put65BenchTest.java | 116 +++
.../sockets/command/Put65RealBenchTest.java | 141 ++++
.../sockets/DurableClientSimpleDUnitTest.java | 14 +-
.../tier/sockets/DurableClientTestCase.java | 6 +-
.../cache/wan/Simple2CacheServerDUnitTest.java | 6 +-
35 files changed, 1541 insertions(+), 982 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/build.gradle
----------------------------------------------------------------------
diff --git a/geode-core/build.gradle b/geode-core/build.gradle
index 757599a..fd56fe1 100755
--- a/geode-core/build.gradle
+++ b/geode-core/build.gradle
@@ -17,6 +17,7 @@
apply plugin: 'antlr'
+apply plugin: 'me.champeau.gradle.jmh'
sourceSets {
jca {
@@ -220,5 +221,9 @@ dependencies {
classesOutput sourceSets.main.output
}
+jmh {
+ duplicateClassesStrategy = 'warn'
+}
+
tasks.eclipse.dependsOn(generateGrammarSource)
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench.java
----------------------------------------------------------------------
diff --git a/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench.java b/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench.java
new file mode 100644
index 0000000..df51b78
--- /dev/null
+++ b/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets.command;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.commons.io.FileUtils.*;
+import static org.apache.geode.distributed.AbstractLauncher.Status.ONLINE;
+import static org.apache.geode.test.dunit.NetworkUtils.getIPLiteral;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.*;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.distributed.AbstractLauncher.Status;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.distributed.ServerLauncher;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.internal.AvailablePort;
+import org.apache.geode.internal.net.SocketCreator;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import java.io.File;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+@Measurement(iterations = 5)
+@Warmup(iterations = 5)
+@Fork(1)
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@State(Scope.Thread)
+@SuppressWarnings("unused")
+public class ClientCachePutBench {
+
+ @Test
+ public void tempTest() throws Exception {
+ String SERVER_XML_FILE_NAME =
+ "/" + StringUtils.replace(ClientCachePutBench.class.getPackage().getName(), ".", "/")
+ + "/ClientCachePutBench-server.xml";
+ assertThat(new File(getClass().getResource(SERVER_XML_FILE_NAME).getFile())).exists();
+ }
+
+ @State(Scope.Benchmark)
+ public static class ClientState {
+ // public static final String SERVER_XML_FILE_NAME =
+ // "/" + StringUtils.replace(ClientCachePutBench.class.getPackage().getName(), ".", "/")
+ // + "/ClientCachePutBench-server.xml";
+ public static final String REGION_NAME = "clientCachePutBench-region";
+
+ public Random random;
+
+ public int serverPort;
+ public Process process;
+ public ServerLauncher launcher;
+ public File serverDirectory;
+
+ public ClientCache clientCache;
+ public Region<String, String> region;
+
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Setup(Level.Trial)
+ public void startServer() throws Exception {
+ this.random = new Random(System.nanoTime());
+
+ this.temporaryFolder.create();
+ this.serverDirectory = this.temporaryFolder.getRoot();
+
+ String SERVER_XML_FILE_NAME =
+ "/" + StringUtils.replace(ClientCachePutBench.class.getPackage().getName(), ".", "/")
+ + "/ClientCachePutBench-server.xml";
+
+ URL srcServerXml = getClass().getResource(SERVER_XML_FILE_NAME);
+ assertThat(srcServerXml).isNotNull();
+ File destServerXml = new File(this.serverDirectory, SERVER_XML_FILE_NAME);
+ copyURLToFile(srcServerXml, destServerXml);
+
+ this.serverPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+
+ List<String> jvmArguments = getJvmArguments();
+
+ List<String> command = new ArrayList<>();
+ command.add(
+ new File(new File(System.getProperty("java.home"), "bin"), "java").getCanonicalPath());
+ for (String jvmArgument : jvmArguments) {
+ command.add(jvmArgument);
+ }
+ command.add("-Dgemfire.cache-xml-file=" + destServerXml.getAbsolutePath());
+ command.add("-cp");
+ command.add(System.getProperty("java.class.path"));
+ command.add(ServerLauncher.class.getName());
+ command.add(ServerLauncher.Command.START.getName());
+ command.add("server1");
+ command.add("--server-port=" + this.serverPort);
+ // put65Command.add("--redirect-output");
+
+ this.process = new ProcessBuilder(command).directory(this.temporaryFolder.getRoot()).start();
+
+ boolean sleep = false;
+ while (sleep) {
+ assertThat(this.process.isAlive()).isTrue();
+ Thread.sleep(10000);
+ }
+
+ ServerLauncher serverLauncher = new ServerLauncher.Builder()
+ .setWorkingDirectory(this.temporaryFolder.getRoot().getAbsolutePath()).build();
+
+ await().atMost(2, MINUTES)
+ .until(() -> assertThat(serverLauncher.status().getStatus()).isEqualTo(ONLINE));
+
+ this.clientCache =
+ new ClientCacheFactory().addPoolServer(getIPLiteral(), this.serverPort).create();
+ this.region =
+ this.clientCache.<String, String>createClientRegionFactory(ClientRegionShortcut.PROXY)
+ .create(REGION_NAME);
+ }
+
+ @TearDown(Level.Trial)
+ public void stopServer() throws Exception {
+ try {
+ this.clientCache.close(false);
+ new ServerLauncher.Builder().setWorkingDirectory(this.serverDirectory.getAbsolutePath())
+ .build().stop();
+ } finally {
+ if (this.process != null) {
+ this.process.destroyForcibly();
+ }
+ this.temporaryFolder.delete();
+ }
+ }
+
+ private List<String> getJvmArguments() {
+ List<String> jvmArguments = new ArrayList<>();
+ jvmArguments.add(
+ "-D" + DistributionConfig.GEMFIRE_PREFIX + ConfigurationProperties.MCAST_PORT + "=0");
+ jvmArguments.add(
+ "-D" + DistributionConfig.GEMFIRE_PREFIX + ConfigurationProperties.LOCATORS + "\"\"");
+ return jvmArguments;
+ }
+ }
+
+ @Benchmark
+ public void test(ClientState state, Blackhole blackhole) throws Exception {
+ String key = "key-" + state.random.nextInt();
+ String value = "value-" + state.random.nextInt();
+ String oldValue = state.region.put(key, value);
+ blackhole.consume(new Object[] {key, value, oldValue});
+ blackhole.consume(oldValue);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Bench.java
----------------------------------------------------------------------
diff --git a/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Bench.java b/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Bench.java
new file mode 100644
index 0000000..d393769
--- /dev/null
+++ b/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Bench.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets.command;
+
+import static org.apache.geode.SystemFailure.loadEmergencyClasses;
+import static org.apache.geode.internal.cache.TXManagerImpl.NOTX;
+import static org.mockito.Mockito.*;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.tier.Command;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.cache.tier.sockets.Message;
+import org.apache.geode.internal.cache.tier.sockets.Part;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.infra.Blackhole;
+
+public class Put65Bench {
+
+ @State(Scope.Benchmark)
+ public static class ServerConnectionState {
+ public Command put65Command;
+ public ServerConnection mockServerConnection;
+ public Message mockMessage;
+
+ @Setup(Level.Trial)
+ public void setup() throws Exception {
+ loadEmergencyClasses();
+
+ this.put65Command = Put65.getCommand();
+
+ this.mockServerConnection = mock(ServerConnection.class,
+ withSettings().defaultAnswer(CALLS_REAL_METHODS).name("mockServerConnection"));
+ when(this.mockServerConnection.getClientVersion()).thenReturn(Version.CURRENT);
+
+ GemFireCacheImpl mockCache = mock(GemFireCacheImpl.class, withSettings().name("mockCache"));
+ when(this.mockServerConnection.getCache()).thenReturn(mockCache);
+
+ TXManagerImpl mockTxManager = mock(TXManagerImpl.class, withSettings().name("mockTxManager"));
+ when(mockCache.getTxManager()).thenReturn(mockTxManager);
+
+ CacheServerStats mockCacheServerStats =
+ mock(CacheServerStats.class, withSettings().name("mockCacheServerStats"));
+ when(this.mockServerConnection.getCacheServerStats()).thenReturn(mockCacheServerStats);
+
+ ClientProxyMembershipID mockProxyId =
+ mock(ClientProxyMembershipID.class, withSettings().name("mockProxyId"));
+ when(this.mockServerConnection.getProxyID()).thenReturn(mockProxyId);
+
+ Message mockErrorResponseMessage =
+ mock(Message.class, withSettings().name("mockErrorResponseMessage"));
+ when(this.mockServerConnection.getErrorResponseMessage())
+ .thenReturn(mockErrorResponseMessage);
+
+ Part mockRegionNamePart = mock(Part.class, withSettings().name("mockRegionNamePart"));
+ when(mockRegionNamePart.getString()).thenReturn("mockRegionNamePart");
+
+ Part mockOperationPart = mock(Part.class);
+ when(mockOperationPart.getObject()).thenReturn(Operation.UPDATE);
+
+ Part mockFlagsPart = mock(Part.class);
+ when(mockFlagsPart.getInt()).thenReturn(0);
+
+ Part mockKeyPart = mock(Part.class);
+ when(mockKeyPart.getObject()).thenReturn("mockKeyPart");
+ when(mockKeyPart.getStringOrObject()).thenReturn("mockKeyPart");
+
+ Part mockIsDeltaPart = mock(Part.class);
+ when(mockIsDeltaPart.getObject()).thenReturn(Boolean.FALSE);
+
+ Part mockValuePart = mock(Part.class);
+ when(mockValuePart.getObject()).thenReturn("mockValuePart");
+
+ Part mockEventPart = mock(Part.class);
+ when(mockEventPart.getObject()).thenReturn("mockEventPart");
+
+ Part mockCallbackArgPart = mock(Part.class);
+ when(mockCallbackArgPart.getObject()).thenReturn("mockCallbackArgPart");
+
+ mockMessage = mock(Message.class);
+
+ when(mockMessage.getTransactionId()).thenReturn(NOTX);
+
+ when(mockMessage.getPart(0)).thenReturn(mockRegionNamePart);
+ when(mockMessage.getPart(1)).thenReturn(mockOperationPart);
+ when(mockMessage.getPart(2)).thenReturn(mockFlagsPart);
+ when(mockMessage.getPart(3)).thenReturn(mockKeyPart);
+ when(mockMessage.getPart(4)).thenReturn(mockIsDeltaPart);
+ when(mockMessage.getPart(5)).thenReturn(mockValuePart);
+ when(mockMessage.getPart(6)).thenReturn(mockEventPart);
+ when(mockMessage.getPart(7)).thenReturn(mockCallbackArgPart);
+ }
+ }
+
+ // @Benchmark
+ public void benchmark(ServerConnectionState state, Blackhole blackhole) {
+ state.put65Command.execute(state.mockMessage, state.mockServerConnection);
+ // Message replyMessage = state.mockServerConnection.getReplyMessage();
+ // blackhole.consume(replyMessage);
+ }
+}
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/jmh/resources/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench-server.xml
----------------------------------------------------------------------
diff --git a/geode-core/src/jmh/resources/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench-server.xml b/geode-core/src/jmh/resources/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench-server.xml
new file mode 100644
index 0000000..2013b37
--- /dev/null
+++ b/geode-core/src/jmh/resources/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench-server.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<cache
+ xmlns="http://geode.apache.org/schema/cache"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://geode.apache.org/schema/cache http://geode.apache.org/schema/cache/cache-1.0.xsd"
+ version="1.0">
+ <region name="clientCachePutBench-region" refid="REPLICATE">
+ <region-attributes>
+ <key-constraint>java.lang.String</key-constraint>
+ <value-constraint>java.lang.String</value-constraint>
+ </region-attributes>
+ </region>
+</cache>
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
index 95d1a5b..74df19c 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
@@ -386,10 +386,22 @@ public class ClusterConfigurationService {
createConfigDirIfNecessary(groupName);
- byte[] jarBytes = locators.stream()
- .map((DistributedMember locator) -> downloadJarFromLocator(locator, groupName, jarName))
- .filter(Objects::nonNull).findFirst().orElseThrow(() -> new IllegalStateException(
- "No locators have a deployed jar named " + jarName + " in " + groupName));
+ byte[] jarBytes = null;
+ for (DistributedMember locator : locators) {
+ jarBytes = downloadJarFromLocator(locator, groupName, jarName);
+ if (jarBytes != null) {
+ break;
+ }
+ }
+ if (jarBytes == null) {
+ throw new IllegalStateException(
+ "No locators have a deployed jar named " + jarName + " in " + groupName);
+ }
+
+ // byte[] jarBytes = locators.stream()
+ // .map((DistributedMember locator) -> downloadJarFromLocator(locator, groupName, jarName))
+ // .filter(Objects::nonNull).findFirst().orElseThrow(() -> new IllegalStateException(
+ // "No locators have a deployed jar named " + jarName + " in " + groupName));
File jarToWrite = getPathToJarOnThisLocator(groupName, jarName).toFile();
FileUtils.writeByteArrayToFile(jarToWrite, jarBytes);
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
index a3c4a93..2294fb8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
@@ -317,7 +317,7 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
getSocketBufferSize(), getMaximumTimeBetweenPings(), this.cache, getMaxConnections(),
getMaxThreads(), getMaximumMessageCount(), getMessageTimeToLive(), this.loadMonitor,
overflowAttributesList, this.isGatewayReceiver, this.gatewayTransportFilters,
- this.tcpNoDelay);
+ this.tcpNoDelay, this.cache.getCancelCriterion());
this.acceptor.start();
this.advisor.handshake();
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
index 9a3241b..97dcba5 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
@@ -25,7 +25,7 @@ import org.apache.geode.internal.Version;
*
* @since GemFire 2.0.2
*/
-public abstract class Acceptor {
+public interface Acceptor {
// The following are communications "mode" bytes sent as the first byte of a
// client/server handshake. They must not be larger than 1 byte
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index ed29472..d8c64f4 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -57,6 +57,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLException;
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.StatisticsFactory;
+import org.apache.geode.internal.statistics.DummyStatisticsFactory;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
@@ -97,7 +100,7 @@ import org.apache.geode.internal.util.ArrayUtils;
* @since GemFire 2.0.2
*/
@SuppressWarnings("deprecation")
-public class AcceptorImpl extends Acceptor implements Runnable {
+public class AcceptorImpl implements Acceptor, Runnable {
private static final Logger logger = LogService.getLogger();
private static final boolean isJRockit = System.getProperty("java.vm.name").contains("JRockit");
@@ -283,7 +286,8 @@ public class AcceptorImpl extends Acceptor implements Runnable {
* @param internalCache The GemFire cache whose contents is served to clients
* @param maxConnections the maximum number of connections allowed in the server pool
* @param maxThreads the maximum number of threads allowed in the server pool
- *
+ *
+ * @param cancelCriterion
* @see SocketCreator#createServerSocket(int, int, InetAddress)
* @see ClientHealthMonitor
* @since GemFire 5.7
@@ -292,12 +296,18 @@ public class AcceptorImpl extends Acceptor implements Runnable {
int socketBufferSize, int maximumTimeBetweenPings, InternalCache internalCache,
int maxConnections, int maxThreads, int maximumMessageCount, int messageTimeToLive,
ConnectionListener listener, List overflowAttributesList, boolean isGatewayReceiver,
- List<GatewayTransportFilter> transportFilter, boolean tcpNoDelay) throws IOException {
+ List<GatewayTransportFilter> transportFilter, boolean tcpNoDelay,
+ final CancelCriterion cancelCriterion) throws IOException {
this.bindHostName = calcBindHostName(internalCache, bindHostName);
this.connectionListener = listener == null ? new ConnectionListenerAdapter() : listener;
this.notifyBySubscription = notifyBySubscription;
this.isGatewayReceiver = isGatewayReceiver;
this.gatewayTransportFilters = transportFilter;
+
+ this.socketBufferSize = socketBufferSize;
+ this.cache = internalCache;
+ this.crHelper = new CachedRegionHelper(this.cache);
+
{
int tmp_maxConnections = maxConnections;
if (tmp_maxConnections < MINIMUM_MAX_CONNECTIONS) {
@@ -375,12 +385,6 @@ public class AcceptorImpl extends Acceptor implements Runnable {
.getSocketCreatorForComponent(SecurableCommunicationChannel.GATEWAY);
}
- final GemFireCacheImpl gc;
- if (getCachedRegionHelper() != null) {
- gc = (GemFireCacheImpl) getCachedRegionHelper().getCache();
- } else {
- gc = null;
- }
final int backLog = Integer.getInteger(BACKLOG_PROPERTY_NAME, DEFAULT_BACKLOG).intValue();
final long tilt = System.currentTimeMillis() + 120 * 1000;
@@ -422,9 +426,7 @@ public class AcceptorImpl extends Acceptor implements Runnable {
Thread.currentThread().interrupt();
}
}
- if (gc != null) {
- gc.getCancelCriterion().checkCancelInProgress(null);
- }
+ cancelCriterion.checkCancelInProgress(null);
} // for
} // isSelector
else { // !isSelector
@@ -452,9 +454,7 @@ public class AcceptorImpl extends Acceptor implements Runnable {
Thread.currentThread().interrupt();
}
}
- if (gc != null) {
- gc.getCancelCriterion().checkCancelInProgress(null);
- }
+ cancelCriterion.checkCancelInProgress(null);
} // for
} // !isSelector
@@ -485,15 +485,15 @@ public class AcceptorImpl extends Acceptor implements Runnable {
}
- this.cache = internalCache;
- this.crHelper = new CachedRegionHelper(this.cache);
+ final StatisticsFactory statsFactory =
+ isGatewayReceiver ? new DummyStatisticsFactory() : this.cache.getDistributedSystem();
- this.clientNotifier = CacheClientNotifier.getInstance(cache, this.stats, maximumMessageCount,
- messageTimeToLive, connectionListener, overflowAttributesList, isGatewayReceiver);
- this.socketBufferSize = socketBufferSize;
+ this.clientNotifier =
+ CacheClientNotifier.getInstance(this.cache, this.stats, statsFactory, maximumMessageCount,
+ messageTimeToLive, this.connectionListener, overflowAttributesList, isGatewayReceiver);
// Create the singleton ClientHealthMonitor
- this.healthMonitor = ClientHealthMonitor.getInstance(internalCache, maximumTimeBetweenPings,
+ this.healthMonitor = ClientHealthMonitor.getInstance(this.cache, maximumTimeBetweenPings,
this.clientNotifier.getStats());
{
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
index d217672..ff9daca 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
@@ -91,31 +91,26 @@ public abstract class BaseCommand implements Command {
private static final int MAX_INCOMING_MSGS =
Integer.getInteger("BridgeServer.MAX_INCOMING_MSGS", -1).intValue();
- private static final Semaphore incomingDataLimiter;
-
- private static final Semaphore incomingMsgLimiter;
- static {
- Semaphore tmp;
- if (MAX_INCOMING_DATA > 0) {
- // backport requires that this is fair since we inc by values > 1
- tmp = new Semaphore(MAX_INCOMING_DATA, true);
- } else {
- tmp = null;
- }
- incomingDataLimiter = tmp;
- if (MAX_INCOMING_MSGS > 0) {
- tmp = new Semaphore(MAX_INCOMING_MSGS, false); // unfair for best
- // performance
- } else {
- tmp = null;
- }
- incomingMsgLimiter = tmp;
+ // backport requires that this is fair since we inc by values > 1
+ private static final Semaphore incomingDataLimiter =
+ createIncomingLimiterSemaphore(MAX_INCOMING_DATA, true);
+
+ // unfair for best performance
+ private static final Semaphore incomingMsgLimiter =
+ createIncomingLimiterSemaphore(MAX_INCOMING_MSGS, false);
+ private static Semaphore createIncomingLimiterSemaphore(final int maximum, final boolean fair) {
+ Semaphore semaphore = null;
+ if (maximum > 0) {
+ semaphore = new Semaphore(maximum, fair);
+ }
+ return semaphore;
}
protected SecurityService securityService = IntegratedSecurityService.getSecurityService();
- final public void execute(Message msg, ServerConnection servConn) {
+ @Override
+ public void execute(final Message msg, final ServerConnection servConn) {
// Read the request and update the statistics
long start = DistributionStats.getStatTime();
// servConn.resetTransientData();
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
index 28d6ae2..813d569 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -43,6 +43,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.geode.internal.security.SecurityService;
import org.apache.logging.log4j.Logger;
import org.apache.shiro.subject.Subject;
@@ -79,7 +80,6 @@ import org.apache.geode.distributed.internal.MessageWithReply;
import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.internal.ClassLoadUtil;
-import org.apache.geode.internal.statistics.DummyStatisticsFactory;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.InternalInstantiator;
import org.apache.geode.internal.net.SocketCloser;
@@ -113,10 +113,8 @@ import org.apache.geode.internal.cache.tier.Acceptor;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.i18n.LocalizedStrings;
-import org.apache.geode.internal.logging.InternalLogWriter;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
-import org.apache.geode.internal.net.SocketCloser;
import org.apache.geode.security.AccessControl;
import org.apache.geode.security.AuthenticationFailedException;
import org.apache.geode.security.AuthenticationRequiredException;
@@ -125,32 +123,25 @@ import org.apache.geode.security.AuthenticationRequiredException;
* Class <code>CacheClientNotifier</code> works on the server and manages client socket connections
* to clients requesting notification of updates and notifies them when updates occur.
*
- *
* @since GemFire 3.2
*/
@SuppressWarnings({"synthetic-access", "deprecation"})
public class CacheClientNotifier {
private static final Logger logger = LogService.getLogger();
+ private static final Logger securityLogger = LogService.getSecurityLogger();
private static volatile CacheClientNotifier ccnSingleton;
/**
* Factory method to construct a CacheClientNotifier <code>CacheClientNotifier</code> instance.
- *
- * @param cache The GemFire <code>Cache</code>
- * @param acceptorStats
- * @param maximumMessageCount
- * @param messageTimeToLive
- * @param listener
- * @param overflowAttributesList
- * @return A <code>CacheClientNotifier</code> instance
*/
- public static synchronized CacheClientNotifier getInstance(Cache cache,
- CacheServerStats acceptorStats, int maximumMessageCount, int messageTimeToLive,
- ConnectionListener listener, List overflowAttributesList, boolean isGatewayReceiver) {
+ public static synchronized CacheClientNotifier getInstance(final Cache cache,
+ final CacheServerStats acceptorStats, final StatisticsFactory statsFactory,
+ final int maximumMessageCount, final int messageTimeToLive, final ConnectionListener listener,
+ final List overflowAttributesList, final boolean isGatewayReceiver) {
if (ccnSingleton == null) {
- ccnSingleton = new CacheClientNotifier(cache, acceptorStats, maximumMessageCount,
- messageTimeToLive, listener, overflowAttributesList, isGatewayReceiver);
+ ccnSingleton = new CacheClientNotifier(cache, acceptorStats, statsFactory,
+ maximumMessageCount, messageTimeToLive, listener);
}
if (!isGatewayReceiver && ccnSingleton.getHaContainer() == null) {
@@ -158,13 +149,6 @@ public class CacheClientNotifier {
// In this case, the HaContainer should be lazily created here
ccnSingleton.initHaContainer(overflowAttributesList);
}
- // else {
- // ccnSingleton.acceptorStats = acceptorStats;
- // ccnSingleton.maximumMessageCount = maximumMessageCount;
- // ccnSingleton.messageTimeToLive = messageTimeToLive;
- // ccnSingleton._connectionListener = listener;
- // ccnSingleton.setCache((GemFireCache)cache);
- // }
return ccnSingleton;
}
@@ -173,6 +157,51 @@ public class CacheClientNotifier {
}
/**
+ * Constructor.
+ *
+ * @param cache The GemFire <code>Cache</code>
+ * @param acceptorStats
+ * @param statsFactory
+ * @param maximumMessageCount
+ * @param messageTimeToLive
+ * @param listener a listener which should receive notifications abouts queues being added or
+ * removed.
+ */
+ private CacheClientNotifier(final Cache cache, final CacheServerStats acceptorStats,
+ final StatisticsFactory statsFactory, final int maximumMessageCount,
+ final int messageTimeToLive, final ConnectionListener listener) {
+ // Set the Cache
+ this.setCache((GemFireCacheImpl) cache);
+ this.acceptorStats = acceptorStats;
+ // we only need one thread per client and wait 50ms for close
+ this.socketCloser = new SocketCloser(1, 50);
+ this._connectionListener = listener;
+
+ this.maximumMessageCount = maximumMessageCount;
+ this.messageTimeToLive = messageTimeToLive;
+
+ this._statistics = new CacheClientNotifierStats(statsFactory);
+
+ try {
+ this.logFrequency = Long.valueOf(System.getProperty(MAX_QUEUE_LOG_FREQUENCY));
+ if (this.logFrequency <= 0) {
+ this.logFrequency = DEFAULT_LOG_FREQUENCY;
+ }
+ } catch (Exception e) {
+ this.logFrequency = DEFAULT_LOG_FREQUENCY;
+ }
+
+ eventEnqueueWaitTime =
+ Integer.getInteger(EVENT_ENQUEUE_WAIT_TIME_NAME, DEFAULT_EVENT_ENQUEUE_WAIT_TIME);
+ if (eventEnqueueWaitTime < 0) {
+ eventEnqueueWaitTime = DEFAULT_EVENT_ENQUEUE_WAIT_TIME;
+ }
+
+ // Schedule task to periodically ping clients.
+ scheduleClientPingTask();
+ }
+
+ /**
* Writes a given message to the output stream
*
* @param dos the <code>DataOutputStream</code> to use for writing the message
@@ -257,32 +286,12 @@ public class CacheClientNotifier {
writeMessage(dos, type, ex.toString(), clientVersion);
}
- // /**
- // * Factory method to return the singleton <code>CacheClientNotifier</code>
- // * instance.
- // * @return the singleton <code>CacheClientNotifier</code> instance
- // */
- // public static CacheClientNotifier getInstance()
- // {
- // return _instance;
- // }
-
- // /**
- // * Shuts down the singleton <code>CacheClientNotifier</code> instance.
- // */
- // public static void shutdownInstance()
- // {
- // if (_instance == null) return;
- // _instance.shutdown();
- // _instance = null;
- // }
-
/**
* Registers a new client updater that wants to receive updates with this server.
*
* @param socket The socket over which the server communicates with the client.
*/
- public void registerClient(Socket socket, boolean isPrimary, long acceptorId,
+ void registerClient(Socket socket, boolean isPrimary, long acceptorId,
boolean notifyBySubscription) throws IOException {
// Since no remote ports were specified in the message, wait for them.
long startTime = this._statistics.startTime();
@@ -329,7 +338,7 @@ public class CacheClientNotifier {
}
}
- protected void registerGFEClient(DataInputStream dis, DataOutputStream dos, Socket socket,
+ private void registerGFEClient(DataInputStream dis, DataOutputStream dos, Socket socket,
boolean isPrimary, long startTime, Version clientVersion, long acceptorId,
boolean notifyBySubscription) throws IOException {
// Read the ports and throw them away. We no longer need them
@@ -382,26 +391,27 @@ public class CacheClientNotifier {
// TODO:hitesh
Properties credentials = HandShake.readCredentials(dis, dos, system);
if (credentials != null && proxy != null) {
- if (securityLogWriter.fineEnabled()) {
- securityLogWriter
- .fine("CacheClientNotifier: verifying credentials for proxyID: " + proxyID);
+ if (securityLogger.isDebugEnabled()) {
+ securityLogger.debug("CacheClientNotifier: verifying credentials for proxyID: {}",
+ proxyID);
}
- Object subject = HandShake.verifyCredentials(authenticator, credentials,
- system.getSecurityProperties(), this.logWriter, this.securityLogWriter, member);
+ Object subject =
+ HandShake.verifyCredentials(authenticator, credentials, system.getSecurityProperties(),
+ system.getLogWriter(), system.getSecurityLogWriter(), member);
if (subject instanceof Principal) {
Principal principal = (Principal) subject;
- if (securityLogWriter.fineEnabled()) {
- securityLogWriter
- .fine("CacheClientNotifier: successfully verified credentials for proxyID: "
- + proxyID + " having principal: " + principal.getName());
+ if (securityLogger.isDebugEnabled()) {
+ securityLogger.debug(
+ "CacheClientNotifier: successfully verified credentials for proxyID: {} having principal: {}",
+ proxyID, principal.getName());
}
String postAuthzFactoryName = sysProps.getProperty(SECURITY_CLIENT_ACCESSOR_PP);
if (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) {
if (principal == null) {
- securityLogWriter.warning(
+ securityLogger.warn(LocalizedMessage.create(
LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_POST_PROCESS_AUTHORIZATION_CALLBACK_ENABLED_BUT_AUTHENTICATION_CALLBACK_0_RETURNED_WITH_NULL_CREDENTIALS_FOR_PROXYID_1,
- new Object[] {SECURITY_CLIENT_AUTHENTICATOR, proxyID});
+ new Object[] {SECURITY_CLIENT_AUTHENTICATOR, proxyID}));
}
Method authzMethod = ClassLoadUtil.methodFromName(postAuthzFactoryName);
authzCallback = (AccessControl) authzMethod.invoke(null, (Object[]) null);
@@ -417,15 +427,15 @@ public class CacheClientNotifier {
LocalizedStrings.CacheClientNotifier_CLIENTPROXYMEMBERSHIPID_OBJECT_COULD_NOT_BE_CREATED_EXCEPTION_OCCURRED_WAS_0
.toLocalizedString(e));
} catch (AuthenticationRequiredException ex) {
- securityLogWriter.warning(
+ securityLogger.warn(LocalizedMessage.create(
LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1,
- new Object[] {proxyID, ex});
+ new Object[] {proxyID, ex}));
writeException(dos, HandShake.REPLY_EXCEPTION_AUTHENTICATION_REQUIRED, ex, clientVersion);
return;
} catch (AuthenticationFailedException ex) {
- securityLogWriter.warning(
+ securityLogger.warn(LocalizedMessage.create(
LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1,
- new Object[] {proxyID, ex});
+ new Object[] {proxyID, ex}));
writeException(dos, HandShake.REPLY_EXCEPTION_AUTHENTICATION_FAILED, ex, clientVersion);
return;
} catch (CacheException e) {
@@ -445,11 +455,9 @@ public class CacheClientNotifier {
return;
}
-
this._statistics.endClientRegistration(startTime);
}
-
/**
* Registers a new client that wants to receive updates with this server.
*
@@ -504,8 +512,9 @@ public class CacheClientNotifier {
"CacheClientNotifier: No proxy exists for durable client with id {}. It must be created.",
proxyId.getDurableId());
}
- l_proxy = new CacheClientProxy(this, socket, proxyId, isPrimary, clientConflation,
- clientVersion, acceptorId, notifyBySubscription);
+ l_proxy = CacheClientProxy.createCacheClientProxy(this, this._cache,
+ this._cache.getDistributedSystem(), SecurityService.getSecurityService(), socket,
+ proxyId, isPrimary, clientConflation, clientVersion, acceptorId, notifyBySubscription);
successful = this.initializeProxy(l_proxy);
} else {
if (proxy.isPrimary()) {
@@ -516,8 +525,8 @@ public class CacheClientNotifier {
qSize = proxy.getQueueSize();
// A proxy exists for this durable client. It must be reinitialized.
if (l_proxy.isPaused()) {
- if (CacheClientProxy.testHook != null) {
- CacheClientProxy.testHook.doTestHook("CLIENT_PRE_RECONNECT");
+ if (CacheClientProxy.getTestHook() != null) {
+ CacheClientProxy.getTestHook().doTestHook("CLIENT_PRE_RECONNECT");
}
if (l_proxy.lockDrain()) {
try {
@@ -531,8 +540,8 @@ public class CacheClientNotifier {
l_proxy.reinitialize(socket, proxyId, this.getCache(), isPrimary, clientConflation,
clientVersion);
l_proxy.setMarkerEnqueued(true);
- if (CacheClientProxy.testHook != null) {
- CacheClientProxy.testHook.doTestHook("CLIENT_RECONNECTED");
+ if (CacheClientProxy.getTestHook() != null) {
+ CacheClientProxy.getTestHook().doTestHook("CLIENT_RECONNECTED");
}
} finally {
l_proxy.unlockDrain();
@@ -543,8 +552,8 @@ public class CacheClientNotifier {
.toLocalizedString();
logger.warn(unsuccessfulMsg);
responseByte = HandShake.REPLY_REFUSED;
- if (CacheClientProxy.testHook != null) {
- CacheClientProxy.testHook.doTestHook("CLIENT_REJECTED_DUE_TO_CQ_BEING_DRAINED");
+ if (CacheClientProxy.getTestHook() != null) {
+ CacheClientProxy.getTestHook().doTestHook("CLIENT_REJECTED_DUE_TO_CQ_BEING_DRAINED");
}
}
} else {
@@ -582,8 +591,9 @@ public class CacheClientNotifier {
if (toCreateNewProxy) {
// Create the new proxy for this non-durable client
- l_proxy = new CacheClientProxy(this, socket, proxyId, isPrimary, clientConflation,
- clientVersion, acceptorId, notifyBySubscription);
+ l_proxy = CacheClientProxy.createCacheClientProxy(this, this._cache,
+ this._cache.getDistributedSystem(), SecurityService.getSecurityService(), socket,
+ proxyId, isPrimary, clientConflation, clientVersion, acceptorId, notifyBySubscription);
successful = this.initializeProxy(l_proxy);
}
}
@@ -754,10 +764,8 @@ public class CacheClientNotifier {
* Unregisters an existing client from this server.
*
* @param memberId Uniquely identifies the client
- *
- *
*/
- public void unregisterClient(ClientProxyMembershipID memberId, boolean normalShutdown) {
+ void unregisterClient(ClientProxyMembershipID memberId, boolean normalShutdown) {
if (logger.isDebugEnabled()) {
logger.debug("CacheClientNotifier: Unregistering all clients with member id: {}", memberId);
}
@@ -781,8 +789,6 @@ public class CacheClientNotifier {
/**
* The client represented by the proxyId is ready to receive updates.
- *
- * @param proxyId
*/
public void readyForEvents(ClientProxyMembershipID proxyId) {
CacheClientProxy proxy = getClientProxy(proxyId);
@@ -817,7 +823,6 @@ public class CacheClientNotifier {
CacheClientNotifier instance = ccnSingleton;
if (instance != null) {
instance.singletonNotifyClients(event, null);
-
}
}
@@ -829,7 +834,6 @@ public class CacheClientNotifier {
CacheClientNotifier instance = ccnSingleton;
if (instance != null) {
instance.singletonNotifyClients(event, cmsg);
-
}
}
@@ -839,10 +843,6 @@ public class CacheClientNotifier {
FilterInfo filterInfo = event.getLocalFilterInfo();
- // if (_logger.fineEnabled()) {
- // _logger.fine("Client dispatcher processing event " + event);
- // }
-
FilterProfile regionProfile = ((LocalRegion) event.getRegion()).getFilterProfile();
if (filterInfo != null) {
// if the routing was made using an old profile we need to recompute it
@@ -964,10 +964,8 @@ public class CacheClientNotifier {
if (filterInfo.filterProcessedLocally) {
removeDestroyTokensFromCqResultKeys(event, filterInfo);
}
-
}
-
private void removeDestroyTokensFromCqResultKeys(InternalCacheEvent event,
FilterInfo filterInfo) {
FilterProfile regionProfile = ((LocalRegion) event.getRegion()).getFilterProfile();
@@ -986,38 +984,22 @@ public class CacheClientNotifier {
}
}
-
/**
* delivers the given message to all proxies for routing. The message should already have client
* interest established, or override the isClientInterested method to implement its own routing
- *
- * @param clientMessage
*/
public static void routeClientMessage(Conflatable clientMessage) {
CacheClientNotifier instance = ccnSingleton;
if (instance != null) {
- instance.singletonRouteClientMessage(clientMessage, instance._clientProxies.keySet()); // ok
- // to
- // use
- // keySet
- // here
- // because
- // all
- // we
- // do
- // is
- // call
- // getClientProxy
- // with
- // these
- // keys
+ // ok to use keySet here because all we do is call getClientProxy with these keys
+ instance.singletonRouteClientMessage(clientMessage, instance._clientProxies.keySet());
}
}
/*
* this is for server side registration of client queue
*/
- public static void routeSingleClientMessage(ClientUpdateMessage clientMessage,
+ static void routeSingleClientMessage(ClientUpdateMessage clientMessage,
ClientProxyMembershipID clientProxyMembershipId) {
CacheClientNotifier instance = ccnSingleton;
if (instance != null) {
@@ -1029,8 +1011,8 @@ public class CacheClientNotifier {
private void singletonRouteClientMessage(Conflatable conflatable,
Collection<ClientProxyMembershipID> filterClients) {
- this._cache.getCancelCriterion().checkCancelInProgress(null); // bug #43942 - client notified
- // but no p2p distribution
+ // bug #43942 - client notified but no p2p distribution
+ this._cache.getCancelCriterion().checkCancelInProgress(null);
List<CacheClientProxy> deadProxies = null;
for (ClientProxyMembershipID clientId : filterClients) {
@@ -1061,7 +1043,8 @@ public class CacheClientNotifier {
* processes the given collection of durable and non-durable client identifiers, returning a
* collection of non-durable identifiers of clients connected to this VM
*/
- public Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs) {
+ Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs) {
+ // TODO: false is ignored here because true is hardcoded in other method
return getProxyIDs(mixedDurableAndNonDurableIDs, false);
}
@@ -1070,7 +1053,7 @@ public class CacheClientNotifier {
* collection of non-durable identifiers of clients connected to this VM. This version can check
* for proxies in initialization as well as fully initialized proxies.
*/
- public Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs,
+ Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs,
boolean proxyInInitMode) {
Set<ClientProxyMembershipID> result = new HashSet();
for (Object id : mixedDurableAndNonDurableIDs) {
@@ -1209,7 +1192,7 @@ public class CacheClientNotifier {
* @param operation The operation that occurred (e.g. AFTER_CREATE)
* @return whether the <code>CacheClientNotifier</code> supports the input operation
*/
- protected boolean supportsOperation(EnumListenerEvent operation) {
+ private boolean supportsOperation(EnumListenerEvent operation) {
return operation == EnumListenerEvent.AFTER_CREATE
|| operation == EnumListenerEvent.AFTER_UPDATE
|| operation == EnumListenerEvent.AFTER_DESTROY
@@ -1219,87 +1202,6 @@ public class CacheClientNotifier {
|| operation == EnumListenerEvent.AFTER_REGION_INVALIDATE;
}
- // /**
- // * Queues the <code>ClientUpdateMessage</code> to be distributed
- // * to interested clients. This method is not being used currently.
- // * @param clientMessage The <code>ClientUpdateMessage</code> to be queued
- // */
- // protected void notifyClients(final ClientUpdateMessage clientMessage)
- // {
- // if (USE_SYNCHRONOUS_NOTIFICATION)
- // {
- // // Execute the method in the same thread as the caller
- // deliver(clientMessage);
- // }
- // else {
- // // Obtain an Executor and use it to execute the method in its own thread
- // try
- // {
- // getExecutor().execute(new Runnable()
- // {
- // public void run()
- // {
- // deliver(clientMessage);
- // }
- // }
- // );
- // } catch (InterruptedException e)
- // {
- // _logger.warning("CacheClientNotifier: notifyClients interrupted", e);
- // Thread.currentThread().interrupt();
- // }
- // }
- // }
-
- // /**
- // * Updates the information this <code>CacheClientNotifier</code> maintains
- // * for a given edge client. It is invoked when a edge client re-connects to
- // * the server.
- // *
- // * @param clientHost
- // * The host on which the client runs (i.e. the host the
- // * CacheClientNotifier uses to communicate with the
- // * CacheClientUpdater) This is used with the clientPort to uniquely
- // * identify the client
- // * @param clientPort
- // * The port through which the server communicates with the client
- // * (i.e. the port the CacheClientNotifier uses to communicate with
- // * the CacheClientUpdater) This is used with the clientHost to
- // * uniquely identify the client
- // * @param remotePort
- // * The port through which the client communicates with the server
- // * (i.e. the new port the ConnectionImpl uses to communicate with the
- // * ServerConnection)
- // * @param membershipID
- // * Uniquely idenifies the client
- // */
- // public void registerClientPort(String clientHost, int clientPort,
- // int remotePort, ClientProxyMembershipID membershipID)
- // {
- // if (_logger.fineEnabled())
- // _logger.fine("CacheClientNotifier: Registering client port: "
- // + clientHost + ":" + clientPort + " with remote port " + remotePort
- // + " and ID " + membershipID);
- // for (Iterator i = getClientProxies().iterator(); i.hasNext();) {
- // CacheClientProxy proxy = (CacheClientProxy)i.next();
- // if (_logger.finerEnabled())
- // _logger.finer("CacheClientNotifier: Potential client: " + proxy);
- // //if (proxy.representsCacheClientUpdater(clientHost, clientPort))
- // if (proxy.isMember(membershipID)) {
- // if (_logger.finerEnabled())
- // _logger
- // .finer("CacheClientNotifier: Updating remotePorts since host and port are a match");
- // proxy.addPort(remotePort);
- // }
- // else {
- // if (_logger.finerEnabled())
- // _logger.finer("CacheClientNotifier: Host and port "
- // + proxy.getRemoteHostAddress() + ":" + proxy.getRemotePort()
- // + " do not match " + clientHost + ":" + clientPort);
- // }
- // }
- // }
-
/**
* Registers client interest in the input region and key.
*
@@ -1350,18 +1252,6 @@ public class CacheClientNotifier {
}
}
- /*
- * protected void addFilterRegisteredClients(String regionName, ClientProxyMembershipID
- * membershipID) throws RegionNotFoundException { // Update Regions book keeping. LocalRegion
- * region = (LocalRegion)this._cache.getRegion(regionName); if (region == null) { //throw new
- * AssertionError("Could not find region named '" + regionName + "'"); // @todo: see bug 36805 //
- * fix for bug 37979 if (_logger.fineEnabled()) { _logger .fine("CacheClientNotifier: Client " +
- * membershipID + " :Throwing RegionDestroyedException as region: " + regionName +
- * " is not present."); } throw new RegionDestroyedException("registerInterest failed",
- * regionName); } else { region.getFilterProfile().addFilterRegisteredClients(this, membershipID);
- * } }
- */
-
/**
* Store region and delta relation
*
@@ -1457,7 +1347,6 @@ public class CacheClientNotifier {
}
}
-
/**
* If the conflatable is an instance of HAEventWrapper, and if the corresponding entry is present
* in the haContainer, set the reference to the clientUpdateMessage to null and putInProgress flag
@@ -1484,9 +1373,6 @@ public class CacheClientNotifier {
}
}
}
- // else {
- // This is a replay-of-event case.
- // }
} else {
// This wrapper resides in haContainer.
wrapper.setClientUpdateMessage(null);
@@ -1541,7 +1427,7 @@ public class CacheClientNotifier {
*
* @return the <code>CacheClientProxy</code> associated to the durableClientId
*/
- public CacheClientProxy getClientProxy(String durableClientId, boolean proxyInInitMode) {
+ private CacheClientProxy getClientProxy(String durableClientId, boolean proxyInInitMode) {
final boolean isDebugEnabled = logger.isDebugEnabled();
final boolean isTraceEnabled = logger.isTraceEnabled();
@@ -1584,46 +1470,10 @@ public class CacheClientNotifier {
}
/**
- * Returns the <code>CacheClientProxySameDS</code> associated to the membershipID *
- *
- * @return the <code>CacheClientProxy</code> associated to the same distributed system
- */
- public CacheClientProxy getClientProxySameDS(ClientProxyMembershipID membershipID) {
- final boolean isDebugEnabled = logger.isDebugEnabled();
- if (isDebugEnabled) {
- logger.debug("{}::getClientProxySameDS(), Determining client for host {}", this,
- membershipID);
- logger.debug("{}::getClientProxySameDS(), Number of proxies in the Cache Clinet Notifier: {}",
- this, getClientProxies().size());
- /*
- * _logger.fine(this + "::getClientProxySameDS(), Proxies in the Cache Clinet Notifier: " +
- * getClientProxies());
- */
- }
- CacheClientProxy proxy = null;
- for (Iterator i = getClientProxies().iterator(); i.hasNext();) {
- CacheClientProxy clientProxy = (CacheClientProxy) i.next();
- if (isDebugEnabled) {
- logger.debug("CacheClientNotifier: Checking client {}", clientProxy);
- }
- if (clientProxy.isSameDSMember(membershipID)) {
- proxy = clientProxy;
- if (isDebugEnabled) {
- logger.debug("CacheClientNotifier: {} represents the client running on host {}", proxy,
- membershipID);
- }
- break;
- }
- }
- return proxy;
- }
-
-
- /**
* It will remove the clients connected to the passed acceptorId. If its the only server, shuts
* down this instance.
*/
- protected synchronized void shutdown(long acceptorId) {
+ synchronized void shutdown(long acceptorId) {
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
logger.debug("At cache server shutdown time, the number of cache servers in the cache is {}",
@@ -1685,14 +1535,14 @@ public class CacheClientNotifier {
*
* @param proxy The <code>CacheClientProxy</code> to add
*/
- protected void addClientProxy(CacheClientProxy proxy) throws IOException {
+ private void addClientProxy(CacheClientProxy proxy) throws IOException {
// this._logger.info(LocalizedStrings.DEBUG, "adding client proxy " + proxy);
getCache(); // ensure cache reference is up to date so firstclient state is correct
this._clientProxies.put(proxy.getProxyID(), proxy);
// Remove this proxy from the init proxy list.
removeClientInitProxy(proxy);
this._connectionListener.queueAdded(proxy.getProxyID());
- if (!(proxy.clientConflation == HandShake.CONFLATION_ON)) {
+ if (!(proxy.isClientConflationOn())) {
// Delta not supported with conflation ON
ClientHealthMonitor chm = ClientHealthMonitor.getInstance();
/*
@@ -1704,22 +1554,20 @@ public class CacheClientNotifier {
}
}
this.timedOutDurableClientProxies.remove(proxy.getProxyID());
-
}
- protected void addClientInitProxy(CacheClientProxy proxy) throws IOException {
+ private void addClientInitProxy(CacheClientProxy proxy) throws IOException {
this._initClientProxies.put(proxy.getProxyID(), proxy);
}
- protected void removeClientInitProxy(CacheClientProxy proxy) throws IOException {
+ private void removeClientInitProxy(CacheClientProxy proxy) throws IOException {
this._initClientProxies.remove(proxy.getProxyID());
}
- protected boolean isProxyInInitializationMode(CacheClientProxy proxy) throws IOException {
+ private boolean isProxyInInitializationMode(CacheClientProxy proxy) throws IOException {
return this._initClientProxies.containsKey(proxy.getProxyID());
}
-
/**
* Returns (possibly stale) set of memberIds for all clients being actively notified by this
* server.
@@ -1781,7 +1629,6 @@ public class CacheClientNotifier {
* @since GemFire 5.6
*/
public boolean hasPrimaryForDurableClient(String durableId) {
-
for (Iterator iter = this._clientProxies.values().iterator(); iter.hasNext();) {
CacheClientProxy proxy = (CacheClientProxy) iter.next();
ClientProxyMembershipID proxyID = proxy.getProxyID();
@@ -1818,7 +1665,9 @@ public class CacheClientNotifier {
return ccp.getQueueSizeStat();
}
- // closes the cq and drains the queue
+ /**
+ * closes the cq and drains the queue
+ */
public boolean closeClientCq(String durableClientId, String clientCQName) throws CqException {
CacheClientProxy proxy = getClientProxy(durableClientId);
// close and drain
@@ -1828,33 +1677,29 @@ public class CacheClientNotifier {
return false;
}
-
/**
* Removes an existing <code>CacheClientProxy</code> from the list of known client proxies
*
* @param proxy The <code>CacheClientProxy</code> to remove
*/
- protected void removeClientProxy(CacheClientProxy proxy) {
- // this._logger.info(LocalizedStrings.DEBUG, "removing client proxy " + proxy, new
- // Exception("stack trace"));
+ void removeClientProxy(CacheClientProxy proxy) {
ClientProxyMembershipID client = proxy.getProxyID();
this._clientProxies.remove(client);
this._connectionListener.queueRemoved();
((GemFireCacheImpl) this.getCache()).cleanupForClient(this, client);
- if (!(proxy.clientConflation == HandShake.CONFLATION_ON)) {
+ if (!(proxy.isClientConflationOn())) {
ClientHealthMonitor chm = ClientHealthMonitor.getInstance();
if (chm != null) {
chm.numOfClientsPerVersion.decrementAndGet(proxy.getVersion().ordinal());
}
}
-
}
void durableClientTimedOut(ClientProxyMembershipID client) {
this.timedOutDurableClientProxies.add(client);
}
- public boolean isTimedOut(ClientProxyMembershipID client) {
+ private boolean isTimedOut(ClientProxyMembershipID client) {
return this.timedOutDurableClientProxies.contains(client);
}
@@ -1868,17 +1713,6 @@ public class CacheClientNotifier {
return Collections.unmodifiableCollection(this._clientProxies.values());
}
- // /**
- // * Returns the <code>Executor</code> that delivers messages to the
- // * <code>CacheClientProxy</code> instances.
- // * @return the <code>Executor</code> that delivers messages to the
- // * <code>CacheClientProxy</code> instances
- // */
- // protected Executor getExecutor()
- // {
- // return _executor;
- // }
-
private void closeAllClientCqs(CacheClientProxy proxy) {
CqService cqService = proxy.getCache().getCqService();
if (cqService != null) {
@@ -1901,7 +1735,6 @@ public class CacheClientNotifier {
/**
* Shuts down durable client proxy
- *
*/
public boolean closeDurableClientProxy(String durableClientId) throws CacheException {
CacheClientProxy ccp = getClientProxy(durableClientId);
@@ -1930,8 +1763,9 @@ public class CacheClientNotifier {
final boolean isDebugEnabled = logger.isDebugEnabled();
for (Iterator i = deadProxies.iterator(); i.hasNext();) {
CacheClientProxy proxy = (CacheClientProxy) i.next();
- if (isDebugEnabled)
+ if (isDebugEnabled) {
logger.debug("CacheClientNotifier: Closing dead client: {}", proxy);
+ }
// Close the proxy
boolean keepProxy = false;
@@ -1939,7 +1773,7 @@ public class CacheClientNotifier {
keepProxy = proxy.close(false, stoppedNormally);
} catch (CancelException e) {
throw e;
- } catch (Exception e) {
+ } catch (Exception e) { // TODO: at least log at debug level
}
// Remove the proxy if necessary. It might not be necessary to remove the
@@ -1960,7 +1794,6 @@ public class CacheClientNotifier {
} // for
}
-
/**
* Registers a new <code>InterestRegistrationListener</code> with the set of
* <code>InterestRegistrationListener</code>s.
@@ -1999,18 +1832,16 @@ public class CacheClientNotifier {
}
/**
- *
* @since GemFire 5.8Beta
*/
- protected boolean containsInterestRegistrationListeners() {
+ boolean containsInterestRegistrationListeners() {
return !this.writableInterestRegistrationListeners.isEmpty();
}
/**
- *
* @since GemFire 5.8Beta
*/
- protected void notifyInterestRegistrationListeners(InterestRegistrationEvent event) {
+ void notifyInterestRegistrationListeners(InterestRegistrationEvent event) {
for (Iterator i = this.writableInterestRegistrationListeners.iterator(); i.hasNext();) {
InterestRegistrationListener listener = (InterestRegistrationListener) i.next();
if (event.isRegister()) {
@@ -2040,8 +1871,6 @@ public class CacheClientNotifier {
GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
if (cache != null) {
this._cache = cache;
- this.logWriter = cache.getInternalLogWriter();
- this.securityLogWriter = cache.getSecurityInternalLogWriter();
}
}
return this._cache;
@@ -2072,68 +1901,6 @@ public class CacheClientNotifier {
}
/**
- * Constructor.
- *
- * @param cache The GemFire <code>Cache</code>
- * @param acceptorStats
- * @param maximumMessageCount
- * @param messageTimeToLive
- * @param listener a listener which should receive notifications abouts queues being added or
- * removed.
- * @param overflowAttributesList
- */
- private CacheClientNotifier(Cache cache, CacheServerStats acceptorStats, int maximumMessageCount,
- int messageTimeToLive, ConnectionListener listener, List overflowAttributesList,
- boolean isGatewayReceiver) {
- // Set the Cache
- this.setCache((GemFireCacheImpl) cache);
- this.acceptorStats = acceptorStats;
- this.socketCloser = new SocketCloser(1, 50); // we only need one thread per client and wait 50ms
- // for close
-
- // Set the LogWriter
- this.logWriter = (InternalLogWriter) cache.getLogger();
-
- this._connectionListener = listener;
-
- // Set the security LogWriter
- this.securityLogWriter = (InternalLogWriter) cache.getSecurityLogger();
-
- this.maximumMessageCount = maximumMessageCount;
- this.messageTimeToLive = messageTimeToLive;
-
- // Initialize the statistics
- StatisticsFactory factory;
- if (isGatewayReceiver) {
- factory = new DummyStatisticsFactory();
- } else {
- factory = this.getCache().getDistributedSystem();
- }
- this._statistics = new CacheClientNotifierStats(factory);
-
- // Initialize the executors
- // initializeExecutors(this._logger);
-
- try {
- this.logFrequency = Long.valueOf(System.getProperty(MAX_QUEUE_LOG_FREQUENCY));
- if (this.logFrequency <= 0) {
- this.logFrequency = DEFAULT_LOG_FREQUENCY;
- }
- } catch (Exception e) {
- this.logFrequency = DEFAULT_LOG_FREQUENCY;
- }
-
- eventEnqueueWaitTime =
- Integer.getInteger(EVENT_ENQUEUE_WAIT_TIME_NAME, DEFAULT_EVENT_ENQUEUE_WAIT_TIME);
- if (eventEnqueueWaitTime < 0) {
- eventEnqueueWaitTime = DEFAULT_EVENT_ENQUEUE_WAIT_TIME;
- }
-
- // Schedule task to periodically ping clients.
- scheduleClientPingTask();
- }
-
- /**
* this message is used to send interest registration to another server. Since interest
* registration performs a state-flush operation this message must not transmitted on an ordered
* socket
@@ -2228,104 +1995,6 @@ public class CacheClientNotifier {
}
-
- // * Initializes the <code>QueuedExecutor</code> and
- // <code>PooledExecutor</code>
- // * used to deliver messages to <code>CacheClientProxy</code> instances.
- // * @param logger The GemFire <code>LogWriterI18n</code>
- // */
- // private void initializeExecutors(LogWriterI18n logger)
- // {
- // // Create the thread groups
- // final ThreadGroup loggerGroup = LoggingThreadGroup.createThreadGroup("Cache
- // Client Notifier Logger Group", logger);
- // final ThreadGroup notifierGroup =
- // new ThreadGroup("Cache Client Notifier Group")
- // {
- // public void uncaughtException(Thread t, Throwable e)
- // {
- // Thread.dumpStack();
- // loggerGroup.uncaughtException(t, e);
- // //CacheClientNotifier.exceptionInThreads = true;
- // }
- // };
- //
- // // Originally set ThreadGroup to be a daemon, but it was causing the
- // following
- // // exception after five minutes of non-activity (the keep alive time of the
- // // threads in the PooledExecutor.
- //
- // // java.lang.IllegalThreadStateException
- // // at java.lang.ThreadGroup.add(Unknown Source)
- // // at java.lang.Thread.init(Unknown Source)
- // // at java.lang.Thread.<init>(Unknown Source)
- // // at
- // org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier$4.newThread(CacheClientNotifier.java:321)
- // // at
- // org.apache.edu.oswego.cs.dl.util.concurrent.PooledExecutor.addThread(PooledExecutor.java:512)
- // // at
- // org.apache.edu.oswego.cs.dl.util.concurrent.PooledExecutor.execute(PooledExecutor.java:888)
- // // at
- // org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier.notifyClients(CacheClientNotifier.java:95)
- // // at
- // org.apache.geode.internal.cache.tier.sockets.ServerConnection.run(ServerConnection.java:271)
- //
- // //notifierGroup.setDaemon(true);
- //
- // if (USE_QUEUED_EXECUTOR)
- // createQueuedExecutor(notifierGroup);
- // else
- // createPooledExecutor(notifierGroup);
- // }
-
- // /**
- // * Creates the <code>QueuedExecutor</code> used to deliver messages
- // * to <code>CacheClientProxy</code> instances
- // * @param notifierGroup The <code>ThreadGroup</code> to which the
- // * <code>QueuedExecutor</code>'s <code>Threads</code> belong
- // */
- // protected void createQueuedExecutor(final ThreadGroup notifierGroup)
- // {
- // QueuedExecutor queuedExecutor = new QueuedExecutor(new LinkedQueue());
- // queuedExecutor.setThreadFactory(new ThreadFactory()
- // {
- // public Thread newThread(Runnable command)
- // {
- // Thread thread = new Thread(notifierGroup, command, "Queued Cache Client
- // Notifier");
- // thread.setDaemon(true);
- // return thread;
- // }
- // });
- // _executor = queuedExecutor;
- // }
-
- // /**
- // * Creates the <code>PooledExecutor</code> used to deliver messages
- // * to <code>CacheClientProxy</code> instances
- // * @param notifierGroup The <code>ThreadGroup</code> to which the
- // * <code>PooledExecutor</code>'s <code>Threads</code> belong
- // */
- // protected void createPooledExecutor(final ThreadGroup notifierGroup)
- // {
- // PooledExecutor pooledExecutor = new PooledExecutor(new
- // BoundedLinkedQueue(4096), 50);
- // pooledExecutor.setMinimumPoolSize(10);
- // pooledExecutor.setKeepAliveTime(1000 * 60 * 5);
- // pooledExecutor.setThreadFactory(new ThreadFactory()
- // {
- // public Thread newThread(Runnable command)
- // {
- // Thread thread = new Thread(notifierGroup, command, "Pooled Cache Client
- // Notifier");
- // thread.setDaemon(true);
- // return thread;
- // }
- // });
- // pooledExecutor.createThreads(5);
- // _executor = pooledExecutor;
- // }
-
protected void deliverInterestChange(ClientProxyMembershipID proxyID,
ClientInterestMessageImpl message) {
DM dm = ((InternalDistributedSystem) this.getCache().getDistributedSystem())
@@ -2471,23 +2140,6 @@ public class CacheClientNotifier {
*/
protected static final int ALL_PORTS = -1;
- // /**
- // * Whether to synchonously deliver messages to proxies.
- // * This is currently hard-coded to true to ensure ordering.
- // */
- // protected static final boolean USE_SYNCHRONOUS_NOTIFICATION =
- // true;
- // Boolean.getBoolean("CacheClientNotifier.USE_SYNCHRONOUS_NOTIFICATION");
-
- // /**
- // * Whether to use the <code>QueuedExecutor</code> (or the
- // * <code>PooledExecutor</code>) to deliver messages to proxies.
- // * Currently, delivery is synchronous. No <code>Executor</code> is
- // * used.
- // */
- // protected static final boolean USE_QUEUED_EXECUTOR =
- // Boolean.getBoolean("CacheClientNotifier.USE_QUEUED_EXECUTOR");
-
/**
* The map of known <code>CacheClientProxy</code> instances. Maps ClientProxyMembershipID to
* CacheClientProxy. Note that the keys in this map are not updated when a durable client
@@ -2512,14 +2164,7 @@ public class CacheClientNotifier {
* direct reference to _cache in CacheClientNotifier code. Instead, you should always use
* <code>getCache()</code>
*/
- private GemFireCacheImpl _cache;
-
- private InternalLogWriter logWriter;
-
- /**
- * The GemFire security <code>LogWriter</code>
- */
- private InternalLogWriter securityLogWriter;
+ private GemFireCacheImpl _cache; // TODO: not thread-safe
/** the maximum number of messages that can be enqueued in a client-queue. */
private int maximumMessageCount;
@@ -2543,10 +2188,6 @@ public class CacheClientNotifier {
*/
private volatile HAContainerWrapper haContainer;
- // /**
- // * The singleton <code>CacheClientNotifier</code> instance
- // */
- // protected static CacheClientNotifier _instance;
/**
* The size of the server-to-client communication socket buffers. This can be modified using the
* BridgeServer.SOCKET_BUFFER_SIZE system property.