You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/02/08 21:55:42 UTC
[1/4] incubator-geode git commit: GEODE-870: Rejecting of old view
Repository: incubator-geode
Updated Branches:
refs/heads/feature/GEODE-870 [created] 0c52268ed
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a0759a87/gemfire-core/src/test/java/com/gemstone/gemfire/test/dunit/SerializableRunnable.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/test/dunit/SerializableRunnable.java b/gemfire-core/src/test/java/com/gemstone/gemfire/test/dunit/SerializableRunnable.java
index 5798861..942a5c1 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/test/dunit/SerializableRunnable.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/test/dunit/SerializableRunnable.java
@@ -22,36 +22,37 @@ import java.io.Serializable;
* This interface provides both {@link Serializable} and {@link
* Runnable}. It is often used in conjunction with {@link
* VM#invoke(Runnable)}.
- *
+ * <p>
* <PRE>
* public void testRegionPutGet() {
- * final Host host = Host.getHost(0);
- * VM vm0 = host.getVM(0);
- * VM vm1 = host.getVM(1);
- * final String name = this.getUniqueName();
- * final Object value = new Integer(42);
- *
- * vm0.invoke(new SerializableRunnable("Put value") {
- * public void run() {
- * ...// get the region //...
- * region.put(name, value);
- * }
- * });
- * vm1.invoke(new SerializableRunnable("Get value") {
- * public void run() {
- * ...// get the region //...
- * assertEquals(value, region.get(name));
- * }
- * });
- * }
+ * final Host host = Host.getHost(0);
+ * VM vm0 = host.getVM(0);
+ * VM vm1 = host.getVM(1);
+ * final String name = this.getUniqueName();
+ * final Object value = new Integer(42);
+ * <p>
+ * vm0.invoke(new SerializableRunnable("Put value") {
+ * public void run() {
+ * ...// get the region //...
+ * region.put(name, value);
+ * }
+ * });
+ * vm1.invoke(new SerializableRunnable("Get value") {
+ * public void run() {
+ * ...// get the region //...
+ * assertEquals(value, region.get(name));
+ * }
+ * });
+ * }
* </PRE>
*/
public abstract class SerializableRunnable
- implements SerializableRunnableIF {
+ implements SerializableRunnableIF {
private static final long serialVersionUID = 7584289978241650456L;
-
+
private String name;
+ private Object[] args;
public SerializableRunnable() {
this.name = null;
@@ -59,23 +60,28 @@ public abstract class SerializableRunnable
/**
* This constructor lets you do the following:
- *
+ * <p>
* <PRE>
* vm.invoke(new SerializableRunnable("Do some work") {
- * public void run() {
- * // ...
- * }
- * });
+ * public void run() {
+ * // ...
+ * }
+ * });
* </PRE>
*/
public SerializableRunnable(String name) {
this.name = name;
}
-
+
+ public SerializableRunnable(String name, Object[] args) {
+ this.name = name;
+ this.args = args;
+ }
+
public void setName(String newName) {
this.name = newName;
}
-
+
public String getName() {
return this.name;
}
[4/4] incubator-geode git commit: Merge branch 'develop' into
feature/GEODE-870
Posted by ud...@apache.org.
Merge branch 'develop' into feature/GEODE-870
# Conflicts:
# gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
# gemfire-core/src/test/java/com/gemstone/gemfire/test/dunit/SerializableRunnable.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/0c52268e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/0c52268e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/0c52268e
Branch: refs/heads/feature/GEODE-870
Commit: 0c52268ed227e26cd0487c5d829486d897152829
Parents: a0759a8 b11113f
Author: Udo Kohlmeyer <uk...@pivotal.io>
Authored: Thu Feb 4 11:04:57 2016 +1100
Committer: Udo Kohlmeyer <uk...@pivotal.io>
Committed: Thu Feb 4 11:04:57 2016 +1100
----------------------------------------------------------------------
LICENSE | 6 +
NOTICE | 6 +
build.gradle | 527 +-
gemfire-assembly/build.gradle | 41 +-
gemfire-assembly/src/main/dist/LICENSE | 6 +
.../LauncherLifecycleCommandsJUnitTest.java | 4 +-
gemfire-common/build.gradle | 2 +-
gemfire-core/build.gradle | 6 +-
.../gemfire/admin/AdminDistributedSystem.java | 5 -
.../internal/AdminDistributedSystemImpl.java | 3 -
.../gemfire/admin/internal/package.html | 4 +-
.../gemfire/admin/jmx/internal/package.html | 6 +-
.../java/com/gemstone/gemfire/cache/Cache.java | 3 +-
.../gemstone/gemfire/cache/CacheFactory.java | 2 -
.../gemfire/cache/CacheTransactionManager.java | 2 +-
.../com/gemstone/gemfire/cache/DataPolicy.java | 1 -
.../gemfire/cache/DiskStoreFactory.java | 3 -
.../gemstone/gemfire/cache/RegionFactory.java | 2 -
.../gemfire/cache/client/ClientCache.java | 4 +-
.../internal/AutoConnectionSourceImpl.java | 2 +-
.../cache/control/RebalanceOperation.java | 2 +-
.../gemfire/cache/control/ResourceManager.java | 2 +-
.../gemfire/cache/execute/ResultSender.java | 9 +-
.../gemfire/cache/hdfs/HDFSStoreFactory.java | 2 +-
.../org/apache/hadoop/io/SequenceFile.java | 24 +-
.../com/gemstone/gemfire/cache/package.html | 83 +-
.../gemfire/cache/query/QueryService.java | 6 +-
.../query/internal/CompiledSortCriterion.java | 11 -
.../query/internal/index/AbstractIndex.java | 4 -
.../query/internal/index/PartitionedIndex.java | 2 +-
.../gemstone/gemfire/cache/server/package.html | 2 +-
.../gemfire/compression/SnappyCompressor.java | 3 +-
.../distributed/internal/DistributedState.java | 143 -
.../internal/DistributionConfigImpl.java | 2 +-
.../internal/DistributionManager.java | 5 +-
.../internal/ForceDisconnectOperation.java | 42 -
.../internal/InternalDistributedSystem.java | 2 +-
.../distributed/internal/InternalLocator.java | 5 +-
.../distributed/internal/ReplyProcessor21.java | 5 +-
.../distributed/internal/ServerLocator.java | 2 +-
.../distributed/internal/StartupMessage.java | 12 -
.../internal/StartupMessageData.java | 4 +-
.../internal/StartupResponseMessage.java | 16 -
.../internal/deadlock/DeadlockDetector.java | 2 +-
.../internal/deadlock/UnsafeThreadLocal.java | 1 -
.../internal/direct/DirectChannel.java | 7 -
.../internal/locks/DLockGrantor.java | 33 +-
.../membership/InternalDistributedMember.java | 2 +-
.../internal/membership/MemberServices.java | 1 -
.../membership/gms/fd/GMSHealthMonitor.java | 10 +-
.../gms/interfaces/HealthMonitor.java | 1 -
.../membership/gms/interfaces/Manager.java | 1 -
.../gms/locator/FindCoordinatorResponse.java | 1 -
.../membership/gms/membership/GMSJoinLeave.java | 336 +-
.../membership/gms/messenger/Transport.java | 27 +-
.../gms/mgr/GMSMembershipManager.java | 9 +-
.../internal/tcpserver/TcpClient.java | 15 +-
.../internal/tcpserver/TcpServer.java | 17 +-
.../internal/InternalDataSerializer.java | 2 -
.../gemfire/internal/admin/package.html | 4 +-
.../admin/remote/DistributionLocatorId.java | 1 -
.../admin/remote/RemoteBridgeServer.java | 8 +-
.../admin/remote/RemoteRegionAttributes.java | 116 +-
.../gemfire/internal/admin/remote/package.html | 4 +-
.../internal/cache/AbstractRegionMap.java | 133 +-
.../gemfire/internal/cache/BucketAdvisor.java | 6 +-
.../gemfire/internal/cache/CacheObserver.java | 1 -
.../internal/cache/DistTXRollbackMessage.java | 2 +-
.../gemfire/internal/cache/DistTXState.java | 6 +-
.../cache/DistributedCacheOperation.java | 11 +-
.../cache/DistributedClearOperation.java | 8 +-
.../cache/DistributedPutAllOperation.java | 2 +-
.../cache/DistributedRemoveAllOperation.java | 2 +-
.../cache/DistributedTombstoneOperation.java | 15 +-
.../gemfire/internal/cache/EventTracker.java | 2 +-
.../internal/cache/GemFireCacheImpl.java | 2 -
.../internal/cache/IdentityArrayList.java | 2 +-
.../internal/cache/InitialImageOperation.java | 94 +-
.../gemfire/internal/cache/LocalRegion.java | 22 +-
.../internal/cache/PartitionedRegion.java | 2 +-
.../gemfire/internal/cache/QueuedOperation.java | 9 +-
.../internal/cache/RemoteInvalidateMessage.java | 1 -
.../internal/cache/RemoteOperationMessage.java | 16 +-
.../internal/cache/RemotePutAllMessage.java | 10 +-
.../CompressedCachedDeserializable.java | 2 +-
.../internal/cache/control/ResourceAdvisor.java | 138 +-
.../cache/control/ResourceListener.java | 2 +-
.../cache/execute/FunctionServiceStats.java | 1 -
.../cache/execute/InternalExecution.java | 2 -
.../cache/execute/InternalFunctionService.java | 14 +-
.../cache/execute/util/CommitFunction.java | 3 +-
.../cache/execute/util/RollbackFunction.java | 3 +-
.../locks/GFEAbstractQueuedSynchronizer.java | 1715 ------
.../locks/ReentrantReadWriteWriteShareLock.java | 477 --
.../cache/partitioned/FetchEntriesMessage.java | 19 +-
.../cache/partitioned/FetchKeysMessage.java | 20 +-
.../cache/partitioned/IndexCreationMsg.java | 31 +-
.../cache/partitioned/PRTombstoneMessage.java | 19 +-
.../cache/partitioned/PartitionMessage.java | 3 +-
.../cache/partitioned/PutAllPRMessage.java | 10 +-
.../StreamingPartitionOperation.java | 2 -
.../rebalance/PartitionedRegionLoadModel.java | 2 -
.../persistence/PersistenceAdvisorImpl.java | 4 -
.../cache/persistence/PersistentMemberView.java | 1 -
.../region/entry/RegionEntryFactoryBuilder.java | 103 +
.../cache/tier/InternalClientMembership.java | 2 +-
.../cache/tier/sockets/CacheClientProxy.java | 2 +-
.../cache/xmlcache/CacheXmlGenerator.java | 4 +-
.../concurrent/CompactConcurrentHashSet2.java | 6 +-
.../gemfire/internal/jta/GlobalTransaction.java | 13 +-
.../gemfire/internal/offheap/DataAsAddress.java | 7 +-
.../offheap/OffHeapRegionEntryHelper.java | 44 +-
.../internal/process/ProcessController.java | 2 +-
.../gemfire/internal/process/signal/Signal.java | 6 +-
.../internal/redis/ByteArrayWrapper.java | 2 +-
.../internal/redis/ByteToCommandDecoder.java | 2 +-
.../gemstone/gemfire/internal/redis/Coder.java | 2 +-
.../redis/RedisCommandParserException.java | 2 +-
.../internal/redis/RegionCreationException.java | 2 +-
.../gemfire/internal/redis/RegionProvider.java | 1 -
.../redis/executor/AbstractExecutor.java | 8 -
.../redis/executor/hll/HyperLogLogPlus.java | 1 -
.../gemfire/internal/shared/NativeCalls.java | 2 -
.../CustomEntryConcurrentHashMap.java | 14 +-
.../gemfire/management/internal/AgentUtil.java | 1 -
.../management/internal/FederatingManager.java | 2 -
.../management/internal/JmxManagerAdvisor.java | 1 -
.../management/internal/MBeanProxyFactory.java | 2 -
.../configuration/domain/CacheElement.java | 1 -
.../management/internal/web/domain/Link.java | 2 +-
.../internal/web/http/ClientHttpRequest.java | 2 +-
.../internal/web/http/HttpHeader.java | 2 +-
.../main/java/com/gemstone/gemfire/package.html | 6 +-
.../com/gemstone/gemfire/pdx/PdxInstance.java | 2 +-
.../gemfire/redis/GemFireRedisServer.java | 3 +-
.../security/GemFireSecurityException.java | 2 +-
.../src/test/java/com/examples/TestObject.java | 6 +-
.../com/examples/ds/PutDataSerializables.java | 6 +-
.../com/gemstone/gemfire/GemFireTestCase.java | 2 +-
.../internal/AutoConnectionSourceDUnitTest.java | 2 +-
.../management/MXMemoryPoolListenerExample.java | 4 -
.../management/MemoryThresholdsDUnitTest.java | 3 +-
.../MemoryThresholdsOffHeapDUnitTest.java | 3 -
.../gemfire/cache/query/QueryTestUtils.java | 10 -
.../QueryUsingFunctionContextDUnitTest.java | 7 +-
.../query/functional/CountStarJUnitTest.java | 3 -
...ctResultsWithDupValuesInRegionJUnitTest.java | 3 -
.../PRQueryCacheClosedJUnitTest.java | 6 -
.../query/partitioned/PRQueryDUnitHelper.java | 38 -
.../PRQueryRegionClosedJUnitTest.java | 6 -
.../PRQueryRegionDestroyedJUnitTest.java | 5 -
.../query/transaction/QueryAndJtaJUnitTest.java | 1 -
.../gemfire/cache30/CacheLogRollDUnitTest.java | 16 -
.../gemfire/cache30/CacheXml80DUnitTest.java | 17 +-
.../gemfire/cache30/CacheXml81DUnitTest.java | 5 +-
.../MemLRUEvictionControllerDUnitTest.java | 2 +-
.../gemfire/cache30/TXOrderDUnitTest.java | 1 -
.../codeAnalysis/CompiledClassUtils.java | 1 -
.../distributed/AbstractLauncherJUnitTest.java | 1 -
.../distributed/LocatorLauncherJUnitTest.java | 1 -
.../distributed/ServerLauncherJUnitTest.java | 1 -
.../ConsoleDistributionManagerDUnitTest.java | 4 +-
.../InternalDistributedSystemJUnitTest.java | 2 +-
.../gemfire/distributed/internal/LDM.java | 2 +-
.../internal/SharedConfigurationJUnitTest.java | 2 +-
...entrantReadWriteWriteShareLockJUnitTest.java | 458 --
.../gms/membership/GMSJoinLeaveJUnitTest.java | 139 +-
.../gemstone/gemfire/internal/ClassBuilder.java | 1 -
.../internal/DataSerializableJUnitTest.java | 2 +-
.../internal/cache/AbstractRegionJUnitTest.java | 2 +-
.../internal/cache/Bug37244JUnitTest.java | 3 -
.../internal/cache/Bug41733DUnitTest.java | 4 -
.../cache/ClientServerTransactionDUnitTest.java | 9 +-
.../internal/cache/ConflationJUnitTest.java | 12 -
.../gemfire/internal/cache/DiskIdJUnitTest.java | 2 -
.../internal/cache/DiskRegionHelperFactory.java | 3 -
.../internal/cache/DiskRegionTestingBase.java | 2 -
.../cache/DistributedCacheTestCase.java | 6 -
.../internal/cache/HABug36773DUnitTest.java | 4 -
.../HAOverflowMemObjectSizerDUnitTest.java | 5 +-
.../cache/IncrementalBackupDUnitTest.java | 1 -
...gionBucketCreationDistributionDUnitTest.java | 28 -
.../PartitionedRegionCreationDUnitTest.java | 14 -
.../cache/PartitionedRegionDUnitTestCase.java | 6 -
.../cache/PartitionedRegionHADUnitTest.java | 2 +-
.../cache/PartitionedRegionTestHelper.java | 17 -
.../PartitionedRegionTestUtilsDUnitTest.java | 6 +-
.../PartitionedRegionWithSameNameDUnitTest.java | 6 -
.../cache/RemoteTransactionDUnitTest.java | 7 +-
.../control/RebalanceOperationDUnitTest.java | 2 -
.../execute/FunctionServiceStatsDUnitTest.java | 2 -
.../execute/PRFunctionExecutionDUnitTest.java | 9 +-
...ctionExecutionWithResultSenderDUnitTest.java | 2 -
.../mock/AlterMockRegionExtensionFunction.java | 2 +-
.../extension/mock/MockCacheExtension.java | 4 +-
.../cache/ha/Bug36853EventsExpiryDUnitTest.java | 3 -
.../internal/cache/ha/ConflatableObject.java | 2 +-
.../cache/ha/EventIdOptimizationJUnitTest.java | 8 +-
.../internal/cache/ha/HARegionJUnitTest.java | 8 +-
.../cache/ha/HARegionQueueDUnitTest.java | 3 -
.../cache/ha/HARegionQueueJUnitTest.java | 2 +-
.../internal/cache/ha/StatsBugDUnitTest.java | 4 +-
.../PartitionedRegionLoadModelJUnitTest.java | 6 +-
...tentColocatedPartitionedRegionDUnitTest.java | 2 +-
...rtitionedRegionWithTransactionDUnitTest.java | 6 -
.../cache/partitioned/ShutdownAllDUnitTest.java | 4 -
.../RegionEntryFactoryBuilderJUnitTest.java | 85 +
.../CacheServerMaxConnectionsJUnitTest.java | 3 -
.../cache/tier/sockets/CacheServerTestUtil.java | 4 -
.../tier/sockets/ClearPropagationDUnitTest.java | 4 -
.../sockets/ClientHealthMonitorJUnitTest.java | 3 -
.../DestroyEntryPropagationDUnitTest.java | 5 -
.../sockets/NewRegionAttributesDUnitTest.java | 2 -
.../sockets/UpdatePropagationDUnitTest.java | 4 -
.../tier/sockets/command/CommitCommandTest.java | 2 +-
.../cache/wan/CompressionConstants.java | 37 -
.../cache/wan/CompressionInputStream.java | 147 -
.../cache/wan/CompressionOutputStream.java | 123 -
.../compression/SnappyCompressorJUnitTest.java | 2 +-
.../jta/functional/TestXACacheLoader.java | 3 -
.../offheap/OffHeapHelperJUnitTest.java | 314 ++
.../OffHeapRegionEntryHelperJUnitTest.java | 870 +++
.../offheap/OutOfOffHeapMemoryDUnitTest.java | 5 +-
.../statistics/StatisticsDUnitTest.java | 1 -
.../internal/util/ArrayUtilsJUnitTest.java | 1 -
.../gemfire/internal/util/SerializableImpl.java | 2 -
.../util/SerializableImplWithValue.java | 2 -
.../management/CompositeTypeTestDUnitTest.java | 6 -
.../management/DiskManagementDUnitTest.java | 11 -
.../management/DistributedSystemDUnitTest.java | 3 +-
.../management/LocatorManagementDUnitTest.java | 6 -
.../gemfire/management/ManagementTestBase.java | 9 -
...ersalMembershipListenerAdapterDUnitTest.java | 7 +-
.../internal/cli/GfshParserJUnitTest.java | 4 +-
.../cli/commands/ShowMetricsDUnitTest.java | 5 -
.../SharedConfigurationDUnitTest.java | 2 +-
.../configuration/utils/XmlUtilsJUnitTest.java | 8 +-
.../internal/pulse/TestClientIdsDUnitTest.java | 2 -
.../pulse/TestSubscriptionsDUnitTest.java | 2 -
...rDistributedSystemMXBeanIntegrationTest.java | 50 +
...horizeOperationForMBeansIntegrationTest.java | 323 ++
...erationForRegionCommandsIntegrationTest.java | 136 +
.../internal/security/JSONAuthCodeTest.java | 143 -
...JSONAuthorizationDetailsIntegrationTest.java | 163 +
.../security/JSONAuthorizationTest.java | 168 -
...tionCodesForDataCommandsIntegrationTest.java | 101 +
...tionCodesForDistributedSystemMXBeanTest.java | 76 +
.../security/ResourceOperationJUnit.java | 394 --
.../WanCommandsControllerJUnitTest.java | 4 +-
.../security/P2PAuthenticationDUnitTest.java | 2 -
.../gemfire/test/dunit/DistributedTestCase.java | 10 +-
.../gemfire/test/dunit/RMIException.java | 2 +-
.../test/dunit/SerializableCallable.java | 2 +-
.../test/dunit/SerializableRunnable.java | 54 +-
.../com/gemstone/gemfire/test/dunit/VM.java | 6 +-
.../gemstone/gemfire/util/JSR166TestCase.java | 17 +-
.../gemstone/persistence/logging/Logger.java | 2 +-
.../gemfire/GemfireSequenceDisplay.java | 7 -
gemfire-core/src/test/java/hydra/GsRandom.java | 1 -
.../src/test/java/hydra/log/AnyLogWriter.java | 4 +-
.../java/security/AuthzCredentialGenerator.java | 5 +-
.../test/java/security/CredentialGenerator.java | 2 +-
.../sanctionedDataSerializables.txt | 229 +-
gemfire-cq/build.gradle | 23 +
.../cache/client/internal/CloseCQOp.java | 72 +
.../cache/client/internal/CreateCQOp.java | 163 +
.../cache/client/internal/CreateCQWithIROp.java | 92 +
.../cache/client/internal/GetDurableCQsOp.java | 135 +
.../client/internal/ServerCQProxyImpl.java | 111 +
.../gemfire/cache/client/internal/StopCQOp.java | 72 +
.../cache/query/internal/cq/ClientCQImpl.java | 615 +++
.../internal/cq/CqAttributesMutatorImpl.java | 68 +
.../cache/query/internal/cq/CqConflatable.java | 223 +
.../cache/query/internal/cq/CqEventImpl.java | 162 +
.../cache/query/internal/cq/CqListenerImpl.java | 56 +
.../cache/query/internal/cq/CqQueryImpl.java | 383 ++
.../query/internal/cq/CqServiceFactoryImpl.java | 69 +
.../cache/query/internal/cq/CqServiceImpl.java | 2087 +++++++
.../internal/cq/CqServiceStatisticsImpl.java | 100 +
.../query/internal/cq/CqServiceVsdStats.java | 411 ++
.../query/internal/cq/CqStatisticsImpl.java | 75 +
.../cache/query/internal/cq/ServerCQImpl.java | 655 +++
.../tier/sockets/command/BaseCQCommand.java | 59 +
.../cache/tier/sockets/command/CloseCQ.java | 131 +
.../cache/tier/sockets/command/ExecuteCQ.java | 168 +
.../cache/tier/sockets/command/ExecuteCQ61.java | 220 +
.../cache/tier/sockets/command/GetCQStats.java | 100 +
.../tier/sockets/command/GetDurableCQs.java | 143 +
.../cache/tier/sockets/command/MonitorCQ.java | 100 +
.../cache/tier/sockets/command/StopCQ.java | 135 +
...cache.query.internal.cq.spi.CqServiceFactory | 15 +
.../gemfire/cache/query/cq/CQJUnitTest.java | 150 +
.../cache/query/cq/dunit/CqDataDUnitTest.java | 1162 ++++
.../dunit/CqDataOptimizedExecuteDUnitTest.java | 54 +
.../cq/dunit/CqDataUsingPoolDUnitTest.java | 1567 ++++++
...qDataUsingPoolOptimizedExecuteDUnitTest.java | 53 +
.../cache/query/cq/dunit/CqPerfDUnitTest.java | 1044 ++++
.../cq/dunit/CqPerfUsingPoolDUnitTest.java | 1004 ++++
.../cache/query/cq/dunit/CqQueryDUnitTest.java | 4004 ++++++++++++++
.../dunit/CqQueryOptimizedExecuteDUnitTest.java | 311 ++
.../cq/dunit/CqQueryUsingPoolDUnitTest.java | 3322 +++++++++++
...QueryUsingPoolOptimizedExecuteDUnitTest.java | 50 +
.../cq/dunit/CqResultSetUsingPoolDUnitTest.java | 1139 ++++
...ltSetUsingPoolOptimizedExecuteDUnitTest.java | 231 +
.../cache/query/cq/dunit/CqStateDUnitTest.java | 134 +
.../cache/query/cq/dunit/CqStatsDUnitTest.java | 441 ++
.../dunit/CqStatsOptimizedExecuteDUnitTest.java | 50 +
.../cq/dunit/CqStatsUsingPoolDUnitTest.java | 452 ++
...StatsUsingPoolOptimizedExecuteDUnitTest.java | 50 +
.../query/cq/dunit/CqTimeTestListener.java | 266 +
.../PartitionedRegionCqQueryDUnitTest.java | 1788 ++++++
...dRegionCqQueryOptimizedExecuteDUnitTest.java | 246 +
.../query/cq/dunit/PrCqUsingPoolDUnitTest.java | 2029 +++++++
.../PrCqUsingPoolOptimizedExecuteDUnitTest.java | 50 +
.../cache/query/dunit/PdxQueryCQDUnitTest.java | 702 +++
.../cache/query/dunit/PdxQueryCQTestBase.java | 494 ++
.../dunit/QueryIndexUpdateRIDUnitTest.java | 819 +++
.../query/dunit/QueryMonitorDUnitTest.java | 1296 +++++
.../cache/snapshot/ClientSnapshotDUnitTest.java | 284 +
.../AnalyzeCQSerializablesJUnitTest.java | 79 +
.../cache/PRDeltaPropagationDUnitTest.java | 1212 ++++
.../internal/cache/PutAllCSDUnitTest.java | 4419 +++++++++++++++
.../cache/RemoteCQTransactionDUnitTest.java | 1116 ++++
.../internal/cache/ha/CQListGIIDUnitTest.java | 820 +++
.../cache/ha/HADispatcherDUnitTest.java | 695 +++
.../sockets/ClientToServerDeltaDUnitTest.java | 1037 ++++
.../DeltaPropagationWithCQDUnitTest.java | 341 ++
...ToRegionRelationCQRegistrationDUnitTest.java | 786 +++
.../sockets/DurableClientCrashDUnitTest.java | 99 +
.../sockets/DurableClientNetDownDUnitTest.java | 80 +
.../sockets/DurableClientSimpleDUnitTest.java | 3404 ++++++++++++
.../tier/sockets/DurableClientTestCase.java | 2089 +++++++
.../CacheServerManagementDUnitTest.java | 571 ++
.../cli/commands/ClientCommandsDUnitTest.java | 1443 +++++
.../DurableClientCommandsDUnitTest.java | 433 ++
.../internal/pulse/TestCQDUnitTest.java | 147 +
.../internal/pulse/TestClientsDUnitTest.java | 108 +
.../internal/pulse/TestServerDUnitTest.java | 99 +
.../ClientAuthorizationTwoDUnitTest.java | 245 +
.../security/ClientAuthzObjectModDUnitTest.java | 416 ++
.../ClientCQPostAuthorizationDUnitTest.java | 522 ++
.../ClientPostAuthorizationDUnitTest.java | 398 ++
.../gemfire/security/MultiuserAPIDUnitTest.java | 391 ++
.../MultiuserDurableCQAuthzDUnitTest.java | 489 ++
.../gemfire/codeAnalysis/excludedClasses.txt | 2 +
.../gemstone/gemfire/codeAnalysis/openBugs.txt | 21 +
.../sanctionedDataSerializables.txt | 4 +
.../codeAnalysis/sanctionedSerializables.txt | 1 +
.../tier/sockets/durablecq-client-cache.xml | 37 +
.../tier/sockets/durablecq-server-cache.xml | 32 +
.../joptsimple/internal/AbbreviationMap.java | 1 -
.../src/main/java/org/json/JSONObject.java | 3 -
gemfire-junit/build.gradle | 21 +
.../gemfire/test/junit/ConditionalIgnore.java | 49 +
.../gemfire/test/junit/IgnoreCondition.java | 32 +
.../gemfire/test/junit/IgnoreUntil.java | 49 +
.../com/gemstone/gemfire/test/junit/Repeat.java | 43 +
.../com/gemstone/gemfire/test/junit/Retry.java | 38 +
.../test/junit/categories/ContainerTest.java | 25 +
.../test/junit/categories/DistributedTest.java | 25 +
.../categories/DistributedTransactionsTest.java | 26 +
.../test/junit/categories/HydraTest.java | 24 +
.../test/junit/categories/IntegrationTest.java | 25 +
.../test/junit/categories/PerformanceTest.java | 25 +
.../gemfire/test/junit/categories/UITest.java | 24 +
.../gemfire/test/junit/categories/UnitTest.java | 25 +
.../gemfire/test/junit/categories/WanTest.java | 24 +
.../test/junit/rules/ConditionalIgnoreRule.java | 123 +
.../test/junit/rules/ExpectedTimeout.java | 180 +
.../test/junit/rules/ExpectedTimeoutRule.java | 180 +
.../test/junit/rules/IgnoreUntilRule.java | 123 +
.../gemfire/test/junit/rules/RepeatRule.java | 81 +
.../gemfire/test/junit/rules/RetryRule.java | 181 +
.../rules/SerializableExternalResource.java | 107 +
.../test/junit/rules/SerializableRuleChain.java | 119 +
.../rules/SerializableTemporaryFolder.java | 70 +
.../test/junit/rules/SerializableTestName.java | 54 +
.../test/junit/rules/SerializableTestRule.java | 33 +
.../junit/rules/SerializableTestWatcher.java | 29 +
.../test/junit/rules/SerializableTimeout.java | 119 +
.../junit/support/DefaultIgnoreCondition.java | 57 +
.../IgnoreConditionEvaluationException.java | 43 +
.../gemfire/test/junit/ConditionalIgnore.java | 49 -
.../gemfire/test/junit/IgnoreCondition.java | 32 -
.../gemfire/test/junit/IgnoreUntil.java | 49 -
.../com/gemstone/gemfire/test/junit/Repeat.java | 43 -
.../com/gemstone/gemfire/test/junit/Retry.java | 38 -
.../test/junit/categories/ContainerTest.java | 25 -
.../test/junit/categories/DistributedTest.java | 25 -
.../categories/DistributedTransactionsTest.java | 26 -
.../test/junit/categories/HydraTest.java | 24 -
.../test/junit/categories/IntegrationTest.java | 25 -
.../test/junit/categories/PerformanceTest.java | 25 -
.../gemfire/test/junit/categories/UITest.java | 24 -
.../gemfire/test/junit/categories/UnitTest.java | 25 -
.../gemfire/test/junit/categories/WanTest.java | 24 -
.../test/junit/rules/ConditionalIgnoreRule.java | 123 -
.../test/junit/rules/ExpectedTimeout.java | 180 -
.../test/junit/rules/ExpectedTimeoutRule.java | 180 -
.../test/junit/rules/IgnoreUntilRule.java | 123 -
.../gemfire/test/junit/rules/RepeatRule.java | 81 -
.../gemfire/test/junit/rules/RetryRule.java | 181 -
.../rules/SerializableExternalResource.java | 107 -
.../test/junit/rules/SerializableRuleChain.java | 119 -
.../rules/SerializableTemporaryFolder.java | 70 -
.../test/junit/rules/SerializableTestName.java | 54 -
.../test/junit/rules/SerializableTestRule.java | 33 -
.../junit/rules/SerializableTestWatcher.java | 29 -
.../test/junit/rules/SerializableTimeout.java | 119 -
.../junit/support/DefaultIgnoreCondition.java | 57 -
.../IgnoreConditionEvaluationException.java | 43 -
gemfire-lucene/build.gradle | 8 +-
.../cache/lucene/LuceneQueryFactory.java | 11 +-
.../cache/lucene/LuceneQueryProvider.java | 2 +-
.../gemfire/cache/lucene/LuceneService.java | 9 +-
.../internal/repository/RepositoryManager.java | 1 -
gemfire-pulse/build.gradle | 5 +-
.../tools/pulse/internal/data/DataBrowser.java | 3 -
.../pulse/internal/data/JMXDataUpdater.java | 91 -
.../pulse/internal/data/JmxManagerFinder.java | 4 -
.../tools/pulse/internal/json/JSONObject.java | 3 -
.../tools/pulse/internal/json/JSONWriter.java | 2 +-
.../ClusterSelectedRegionsMemberService.java | 3 -
gemfire-rebalancer/build.gradle | 16 +-
.../gemfire/cache/util/AutoBalancer.java | 4 +-
gemfire-site/website/layouts/footer.html | 2 +-
gemfire-wan/build.gradle | 23 +
.../client/internal/GatewaySenderBatchOp.java | 313 ++
.../cache/client/internal/SenderProxy.java | 43 +
.../internal/locator/wan/LocatorDiscovery.java | 227 +
.../internal/locator/wan/LocatorHelper.java | 143 +
.../locator/wan/LocatorJoinMessage.java | 105 +
.../wan/LocatorMembershipListenerImpl.java | 230 +
.../locator/wan/RemoteLocatorJoinRequest.java | 87 +
.../locator/wan/RemoteLocatorJoinResponse.java | 89 +
.../locator/wan/RemoteLocatorPingRequest.java | 56 +
.../locator/wan/RemoteLocatorPingResponse.java | 55 +
.../locator/wan/RemoteLocatorRequest.java | 66 +
.../locator/wan/RemoteLocatorResponse.java | 74 +
.../internal/locator/wan/WANFactoryImpl.java | 74 +
.../locator/wan/WanLocatorDiscovererImpl.java | 138 +
.../cache/wan/AbstractRemoteGatewaySender.java | 169 +
.../cache/wan/GatewayReceiverFactoryImpl.java | 147 +
.../internal/cache/wan/GatewayReceiverImpl.java | 253 +
.../wan/GatewaySenderEventRemoteDispatcher.java | 766 +++
.../cache/wan/GatewaySenderFactoryImpl.java | 389 ++
.../wan/parallel/ParallelGatewaySenderImpl.java | 267 +
...rentParallelGatewaySenderEventProcessor.java | 67 +
...moteParallelGatewaySenderEventProcessor.java | 122 +
...urrentSerialGatewaySenderEventProcessor.java | 45 +
...RemoteSerialGatewaySenderEventProcessor.java | 50 +
.../wan/serial/SerialGatewaySenderImpl.java | 260 +
...ternal.locator.wan.LocatorMembershipListener | 15 +
...ne.gemfire.internal.cache.wan.spi.WANFactory | 15 +
.../cache/CacheXml70GatewayDUnitTest.java | 243 +
.../cache/CacheXml80GatewayDUnitTest.java | 77 +
.../AnalyzeWANSerializablesJUnitTest.java | 91 +
.../internal/cache/UpdateVersionDUnitTest.java | 960 ++++
.../gemfire/internal/cache/wan/WANTestBase.java | 5183 ++++++++++++++++++
...oncurrentParallelGatewaySenderDUnitTest.java | 860 +++
...ntParallelGatewaySenderOffHeapDUnitTest.java | 32 +
...allelGatewaySenderOperation_1_DUnitTest.java | 848 +++
...allelGatewaySenderOperation_2_DUnitTest.java | 537 ++
...tSerialGatewaySenderOperationsDUnitTest.java | 111 +
...GatewaySenderOperationsOffHeapDUnitTest.java | 32 +
.../ConcurrentWANPropogation_1_DUnitTest.java | 608 ++
.../ConcurrentWANPropogation_2_DUnitTest.java | 485 ++
.../cache/wan/disttx/DistTXWANDUnitTest.java | 209 +
.../CommonParallelGatewaySenderDUnitTest.java | 481 ++
...onParallelGatewaySenderOffHeapDUnitTest.java | 32 +
...wWANConcurrencyCheckForDestroyDUnitTest.java | 526 ++
.../cache/wan/misc/PDXNewWanDUnitTest.java | 787 +++
...dRegion_ParallelWANPersistenceDUnitTest.java | 752 +++
...dRegion_ParallelWANPropogationDUnitTest.java | 1132 ++++
.../SenderWithTransportFilterDUnitTest.java | 241 +
...downAllPersistentGatewaySenderDUnitTest.java | 205 +
.../wan/misc/WANConfigurationJUnitTest.java | 609 ++
.../wan/misc/WANLocatorServerDUnitTest.java | 193 +
.../cache/wan/misc/WANSSLDUnitTest.java | 151 +
.../wan/misc/WanAutoDiscoveryDUnitTest.java | 557 ++
.../cache/wan/misc/WanValidationsDUnitTest.java | 1678 ++++++
...tewaySenderOperation_2_OffHeapDUnitTest.java | 32 +
...tewaySenderOperation_2_OffHeapDUnitTest.java | 32 +
...GatewaySenderOperationsOffHeapDUnitTest.java | 34 +
...ewaySenderQueueOverflowOffHeapDUnitTest.java | 34 +
.../ParallelWANConflationOffHeapDUnitTest.java | 34 +
...nceEnabledGatewaySenderOffHeapDUnitTest.java | 34 +
...ropogationConcurrentOpsOffHeapDUnitTest.java | 34 +
.../ParallelWANPropogationOffHeapDUnitTest.java | 34 +
...erialGatewaySenderQueueOffHeapDUnitTest.java | 34 +
...nceEnabledGatewaySenderOffHeapDUnitTest.java | 34 +
.../SerialWANPropogationOffHeapDUnitTest.java | 34 +
...ation_PartitionedRegionOffHeapDUnitTest.java | 34 +
...allelGatewaySenderOperation_2_DUnitTest.java | 38 +
...arallelGatewaySenderOperationsDUnitTest.java | 636 +++
...llelGatewaySenderQueueOverflowDUnitTest.java | 531 ++
.../ParallelWANConflationDUnitTest.java | 496 ++
...ersistenceEnabledGatewaySenderDUnitTest.java | 1823 ++++++
...llelWANPropagationClientServerDUnitTest.java | 114 +
...lelWANPropagationConcurrentOpsDUnitTest.java | 290 +
.../ParallelWANPropagationDUnitTest.java | 1448 +++++
...ParallelWANPropagationLoopBackDUnitTest.java | 425 ++
.../wan/parallel/ParallelWANStatsDUnitTest.java | 632 +++
...tewaySenderDistributedDeadlockDUnitTest.java | 407 ++
...rialGatewaySenderEventListenerDUnitTest.java | 390 ++
.../SerialGatewaySenderOperationsDUnitTest.java | 654 +++
.../SerialGatewaySenderQueueDUnitTest.java | 337 ++
...ersistenceEnabledGatewaySenderDUnitTest.java | 602 ++
.../SerialWANPropagationLoopBackDUnitTest.java | 538 ++
.../serial/SerialWANPropogationDUnitTest.java | 1602 ++++++
...NPropogation_PartitionedRegionDUnitTest.java | 439 ++
.../SerialWANPropogationsFeatureDUnitTest.java | 373 ++
.../wan/serial/SerialWANStatsDUnitTest.java | 596 ++
.../wan/wancommand/WANCommandTestBase.java | 511 ++
...anCommandCreateGatewayReceiverDUnitTest.java | 699 +++
.../WanCommandCreateGatewaySenderDUnitTest.java | 763 +++
...WanCommandGatewayReceiverStartDUnitTest.java | 327 ++
.../WanCommandGatewayReceiverStopDUnitTest.java | 332 ++
.../WanCommandGatewaySenderStartDUnitTest.java | 416 ++
.../WanCommandGatewaySenderStopDUnitTest.java | 367 ++
.../wan/wancommand/WanCommandListDUnitTest.java | 403 ++
.../WanCommandPauseResumeDUnitTest.java | 716 +++
.../wancommand/WanCommandStatusDUnitTest.java | 582 ++
.../management/WANManagementDUnitTest.java | 525 ++
.../ClusterConfigurationDUnitTest.java | 1055 ++++
.../pulse/TestRemoteClusterDUnitTest.java | 274 +
.../gemfire/codeAnalysis/excludedClasses.txt | 2 +
.../gemstone/gemfire/codeAnalysis/openBugs.txt | 21 +
.../sanctionedDataSerializables.txt | 28 +
.../codeAnalysis/sanctionedSerializables.txt | 0
gemfire-web/build.gradle | 3 +-
.../web/util/ConvertUtilsJUnitTest.java | 2 +-
gradle/code-analysis.gradle | 113 +
gradle/dependency-versions.properties | 8 +-
gradle/ide.gradle | 53 +
gradle/java.gradle | 128 +
gradle/publish.gradle | 83 +
gradle/rat.gradle | 4 +
gradle/test.gradle | 241 +
settings.gradle | 2 +
540 files changed, 98829 insertions(+), 8177 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0c52268e/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --cc gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index 5898b23,0b0cfa0..2e5bdc7
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@@ -53,49 -85,34 +53,33 @@@ import static com.gemstone.gemfire.inte
* GMSJoinLeave handles membership communication with other processes in the
* distributed system. It replaces the JGroups channel membership services
* that Geode formerly used for this purpose.
- *
*/
public class GMSJoinLeave implements JoinLeave, MessageHandler {
-
+
public static final String BYPASS_DISCOVERY_PROPERTY = "gemfire.bypass-discovery";
- /**
- * amount of time to wait for responses to FindCoordinatorRequests
- */
+ /** amount of time to wait for responses to FindCoordinatorRequests */
private static final int DISCOVERY_TIMEOUT = Integer.getInteger("gemfire.discovery-timeout", 3000);
- /**
- * amount of time to sleep before trying to join after a failed attempt
- */
+ /** amount of time to sleep before trying to join after a failed attempt */
private static final int JOIN_RETRY_SLEEP = Integer.getInteger("gemfire.join-retry-sleep", 1000);
- /**
- * time to wait for a broadcast message to be transmitted by jgroups
- */
+ /** time to wait for a broadcast message to be transmitted by jgroups */
private static final long BROADCAST_MESSAGE_SLEEP_TIME = Long.getLong("gemfire.broadcast-message-sleep-time", 1000);
- /**
- * if the locators don't know who the coordinator is we send find-coord requests to this many nodes
- */
+ /** if the locators don't know who the coordinator is we send find-coord requests to this many nodes */
private static final int MAX_DISCOVERY_NODES = Integer.getInteger("gemfire.max-discovery-nodes", 30);
- /**
- * interval for broadcasting the current view to members in case they didn't get it the first time
- */
+ /** interval for broadcasting the current view to members in case they didn't get it the first time */
private static final long VIEW_BROADCAST_INTERVAL = Long.getLong("gemfire.view-broadcast-interval", 60000);
- /**
- * membership logger
- */
+ /** membership logger */
private static final Logger logger = Services.getLogger();
- /**
- * the view ID where I entered into membership
- */
+ /** the view ID where I entered into membership */
private int birthViewId;
- /**
- * my address
- */
+ /** my address */
private InternalDistributedMember localAddress;
private Services services;
@@@ -334,11 -309,10 +276,10 @@@
/**
* send a join request and wait for a reply. Process the reply.
* This may throw a SystemConnectException or an AuthenticationFailedException
- *
+ *
- * @param coord
* @return true if the attempt succeeded, false if it timed out
*/
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "WA_NOT_IN_LOOP")
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="WA_NOT_IN_LOOP")
boolean attemptToJoin() {
SearchState state = searchState;
@@@ -476,7 -450,8 +417,7 @@@
recordViewRequest(incomingRequest);
return;
}
-
+
-
InternalDistributedMember mbr = incomingRequest.getMemberID();
if (logger.isDebugEnabled()) {
@@@ -865,10 -837,9 +808,9 @@@
}
private TcpClientWrapper tcpClientWrapper = new TcpClientWrapper();
-
+
/***
* testing purpose
- *
* @param tcpClientWrapper
*/
void setTcpClientWrapper(TcpClientWrapper tcpClientWrapper) {
@@@ -1099,19 -1070,15 +1041,17 @@@
joinResponse.notifyAll();
}
}
-
+
/**
* for testing, do not use in any other case as it is not thread safe
- *
+ * @param req
*/
JoinResponseMessage[] getJoinResponseMessage() {
return joinResponse;
}
+
/***
* for testing purpose
- *
* @param jrm
*/
void setJoinResponseMessage(JoinResponseMessage jrm) {
@@@ -1723,7 -1684,7 +1658,6 @@@
services.getMessenger().sendUnreliably(msg);
}
}
-
-
}
class ViewCreator extends Thread {
@@@ -2195,44 -2157,23 +2130,23 @@@
Set<InternalDistributedMember> newRemovals = new HashSet<>();
Set<InternalDistributedMember> newLeaves = new HashSet<>();
- synchronized (viewRequests) {
- for (DistributionMessage msg : viewRequests) {
- switch (msg.getDSFID()) {
- case LEAVE_REQUEST_MESSAGE:
- newLeaves.add(((LeaveRequestMessage) msg).getMemberID());
- break;
- case REMOVE_MEMBER_REQUEST:
- newRemovals.add(((RemoveMemberMessage) msg).getMemberID());
- break;
- default:
- break;
- }
- }
- }
+ filterMembers(mbrs, newRemovals, REMOVE_MEMBER_REQUEST);
- filterMembers(mbrs, newLeaves, LEAVE_REQUEST_MESSAGE);
-
++ filterMembers(mbrs, newLeaves, LEAVE_REQUEST_MESSAGE);
+
for (InternalDistributedMember mbr : mbrs) {
- if (newRemovals.contains(mbr)) {
- // no need to do a health check on a member who is already leaving
- logger.info("member {} is already scheduled for removal", mbr);
- continue;
- }
- if (newLeaves.contains(mbr)) {
- // no need to do a health check on a member that is declared crashed
- logger.info("member {} has already sent a leave-request", mbr);
- continue;
- }
final InternalDistributedMember fmbr = mbr;
checkers.add(new Callable<InternalDistributedMember>() {
@Override
public InternalDistributedMember call() throws Exception {
// return the member id if it fails health checks
- logger.info("checking state of member " + fmbr);
- if (services.getHealthMonitor().checkIfAvailable(fmbr, "Member failed to acknowledge a membership view", false)) {
- logger.info("member " + fmbr + " passed availability check");
- return fmbr;
+ InternalDistributedMember mbr = GMSJoinLeave.this.checkIfAvailable(fmbr);
-
++
+ synchronized (viewRequests) {
+ if(mbr != null)
+ mbrs.remove(mbr);
+ viewRequests.notifyAll();
}
- logger.info("member " + fmbr + " failed availability check");
- return null;
+ return mbr;
}
});
}
@@@ -2254,34 -2195,78 +2168,78 @@@
});
try {
- List<Future<InternalDistributedMember>> futures;
- futures = svc.invokeAll(checkers);
long giveUpTime = System.currentTimeMillis() + viewAckTimeout;
- for (Future<InternalDistributedMember> future : futures) {
- long now = System.currentTimeMillis();
- try {
- InternalDistributedMember mbr = null;
- long timeToWait = giveUpTime - now;
- if (timeToWait <= 0) {
- // TODO if timeToWait==0 is future.get() guaranteed to return immediately?
- // It looks like some code paths invoke Object.wait(0), which waits forever.
- timeToWait = 1;
- }
- mbr = future.get(timeToWait, TimeUnit.MILLISECONDS);
- if (mbr != null) {
- mbrs.remove(mbr);
+ List<Future<InternalDistributedMember>> futures;
+ futures = submitAll(svc, checkers);
+ long waitTime = giveUpTime - System.currentTimeMillis();
+ synchronized (viewRequests) {
+ while(waitTime>0 ) {
+ logger.debug("removeHealthyMembers: mbrs" + mbrs.size());
-
++
+ filterMembers(mbrs, newRemovals, REMOVE_MEMBER_REQUEST);
- filterMembers(mbrs, newLeaves, LEAVE_REQUEST_MESSAGE);
-
++ filterMembers(mbrs, newLeaves, LEAVE_REQUEST_MESSAGE);
++
+ if(mbrs.isEmpty()) {
+ break;
}
- } catch (java.util.concurrent.TimeoutException e) {
- // timeout - member didn't pass the final check and will not be removed
- // from the collection of members
- } catch (ExecutionException e) {
- logger.info("unexpected exception caught during member verification", e);
-
++
+ viewRequests.wait(waitTime);
+ waitTime = giveUpTime - System.currentTimeMillis();
}
}
-
++
+ //we have waited for all members, now check if we considered any removeRequest;
+ //add them back to create new view
+ if(!newRemovals.isEmpty()) {
+ newRemovals.removeAll(newLeaves);
+ mbrs.addAll(newRemovals);
+ }
-
++
} finally {
svc.shutdownNow();
}
}
+
+ protected void filterMembers(Collection<InternalDistributedMember> mbrs, Set<InternalDistributedMember> removalRequestForMembers, short requestType) {
+ Set<InternalDistributedMember> gotRemovalRequests = getPendingRequestIDs(requestType);
-
++
+ if(!gotRemovalRequests.isEmpty()) {
+ logger.debug("removeHealthyMembers: gotRemovalRequests " + gotRemovalRequests.size());
+ Iterator<InternalDistributedMember> itr = gotRemovalRequests.iterator();
+ while(itr.hasNext()) {
+ InternalDistributedMember removeMember = itr.next();
+ if(mbrs.contains(removeMember)) {
+ testFlagForRemovalRequest = true;
+ removalRequestForMembers.add(removeMember);
+ mbrs.remove(removeMember);
+ }
+ }
+ }
+ }
-
++
+ private <T> List<Future<T>> submitAll ( ExecutorService executor, Collection<? extends Callable<T> > tasks ) {
+ List<Future<T>> result = new ArrayList<Future<T>>( tasks.size() );
+
+ for ( Callable<T> task : tasks ) {
+ result.add(executor.submit(task));
+ }
+
+ return result;
+ }
-
++
+ boolean getTestFlageForRemovalRequest() {
+ return testFlagForRemovalRequest;
+ }
}
-
+
+ InternalDistributedMember checkIfAvailable(InternalDistributedMember fmbr) {
+ // return the member id if it fails health checks
+ logger.info("checking state of member " + fmbr);
+ if (services.getHealthMonitor().checkIfAvailable(fmbr, "Member failed to acknowledge a membership view", false)) {
+ logger.info("member " + fmbr + " passed availability check");
+ return fmbr;
+ }
+ logger.info("member " + fmbr + " failed availability check");
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0c52268e/gemfire-core/src/test/java/com/gemstone/gemfire/test/dunit/SerializableRunnable.java
----------------------------------------------------------------------
diff --cc gemfire-core/src/test/java/com/gemstone/gemfire/test/dunit/SerializableRunnable.java
index 942a5c1,658924a..cd2612c
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/test/dunit/SerializableRunnable.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/test/dunit/SerializableRunnable.java
@@@ -47,12 -47,11 +47,12 @@@ import java.io.Serializable
* </PRE>
*/
public abstract class SerializableRunnable
- implements SerializableRunnableIF {
+ implements SerializableRunnableIF {
private static final long serialVersionUID = 7584289978241650456L;
-
+
private String name;
+ private Object[] args;
public SerializableRunnable() {
this.name = null;
[2/4] incubator-geode git commit: GEODE-870: Rejecting of old view
Posted by ud...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a0759a87/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java
index de60132..f4ca966 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java
@@ -16,15 +16,6 @@
*/
package com.gemstone.gemfire.distributed;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.LineNumberReader;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
import com.gemstone.gemfire.ForcedDisconnectException;
import com.gemstone.gemfire.GemFireConfigException;
import com.gemstone.gemfire.LogWriter;
@@ -33,12 +24,7 @@ import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionShortcut;
-import com.gemstone.gemfire.distributed.internal.DistributionConfig;
-import com.gemstone.gemfire.distributed.internal.DistributionException;
-import com.gemstone.gemfire.distributed.internal.DistributionManager;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.distributed.internal.InternalLocator;
-import com.gemstone.gemfire.distributed.internal.MembershipListener;
+import com.gemstone.gemfire.distributed.internal.*;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.MembershipManager;
import com.gemstone.gemfire.distributed.internal.membership.MembershipTestHook;
@@ -50,12 +36,14 @@ import com.gemstone.gemfire.internal.AvailablePortHelper;
import com.gemstone.gemfire.internal.logging.InternalLogWriter;
import com.gemstone.gemfire.internal.logging.LocalLogWriter;
import com.gemstone.gemfire.internal.tcp.Connection;
-import com.gemstone.gemfire.test.dunit.AsyncInvocation;
-import com.gemstone.gemfire.test.dunit.DistributedTestCase;
-import com.gemstone.gemfire.test.dunit.Host;
-import com.gemstone.gemfire.test.dunit.SerializableCallable;
-import com.gemstone.gemfire.test.dunit.SerializableRunnable;
-import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
/**
* Tests the ability of the {@link Locator} API to start and stop
@@ -66,7 +54,7 @@ import com.gemstone.gemfire.test.dunit.VM;
public class LocatorDUnitTest extends DistributedTestCase {
static TestHook hook;
-
+
/**
* Creates a new <code>LocatorDUnitTest</code>
*/
@@ -76,12 +64,12 @@ public class LocatorDUnitTest extends DistributedTestCase {
private static final String WAIT2_MS_NAME = "LocatorDUnitTest.WAIT2_MS";
private static final int WAIT2_MS_DEFAULT = 5000; // 2000 -- see bug 36470
- private static final int WAIT2_MS
+ private static final int WAIT2_MS
= Integer.getInteger(WAIT2_MS_NAME, WAIT2_MS_DEFAULT).intValue();
-
+
private int port1;
private int port2;
-
+
@Override
public void setUp() throws Exception {
super.setUp();
@@ -89,7 +77,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
port2 = -1;
addExpectedException("Removing shunned member");
}
-
+
@Override
public void tearDown2() {
if (Locator.hasLocator()) {
@@ -104,10 +92,9 @@ public class LocatorDUnitTest extends DistributedTestCase {
deleteLocatorStateFile(port2);
}
}
-
+
//////// Test Methods
-
/**
* SQLFire uses a colocated locator in a dm-type=normal VM. This tests that
* the locator can resume control as coordinator after all locators have been
@@ -122,7 +109,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
-
+
port1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
deleteLocatorStateFile(port1);
@@ -131,21 +118,21 @@ public class LocatorDUnitTest extends DistributedTestCase {
properties.put("mcast-port", "0");
properties.put("start-locator", locators);
properties.put("log-level", getDUnitLogLevel());
- properties.put("security-peer-auth-init","com.gemstone.gemfire.distributed.AuthInitializer.create");
- properties.put("security-peer-authenticator","com.gemstone.gemfire.distributed.MyAuthenticator.create");
+ properties.put("security-peer-auth-init", "com.gemstone.gemfire.distributed.AuthInitializer.create");
+ properties.put("security-peer-authenticator", "com.gemstone.gemfire.distributed.MyAuthenticator.create");
properties.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
properties.put(DistributionConfig.USE_CLUSTER_CONFIGURATION_NAME, "false");
- system = (InternalDistributedSystem)DistributedSystem.connect(properties);
+ system = (InternalDistributedSystem) DistributedSystem.connect(properties);
InternalDistributedMember mbr = system.getDistributedMember();
assertEquals("expected the VM to have NORMAL vmKind",
DistributionManager.NORMAL_DM_TYPE, system.getDistributedMember().getVmKind());
-
+
properties.remove("start-locator");
properties.put("log-level", getDUnitLogLevel());
properties.put("locators", locators);
SerializableRunnable startSystem = new SerializableRunnable("start system") {
public void run() {
- system = (InternalDistributedSystem)DistributedSystem.connect(properties);
+ system = (InternalDistributedSystem) DistributedSystem.connect(properties);
}
};
vm1.invoke(startSystem);
@@ -155,12 +142,12 @@ public class LocatorDUnitTest extends DistributedTestCase {
Cache cache = CacheFactory.create(system);
Region r = cache.createRegionFactory(RegionShortcut.REPLICATE).create("test region");
assertNotNull("expected to create a region", r);
-
+
// create a lock service and have every vm get a lock
DistributedLockService service = DistributedLockService.create("test service", system);
service.becomeLockGrantor();
service.lock("foo0", 0, 0);
-
+
vm1.invoke(new SerializableRunnable("get the lock service and lock something") {
public void run() {
final DistributedLockService service = DistributedLockService.create("test service", system);
@@ -174,11 +161,10 @@ public class LocatorDUnitTest extends DistributedTestCase {
service.lock("foo2", 0, 0);
}
});
-
-
+
// cause elder failover. vm1 will become the lock grantor
system.disconnect();
-
+
try {
vm1.invoke(new SerializableRunnable("ensure grantor failover") {
public void run() {
@@ -189,26 +175,27 @@ public class LocatorDUnitTest extends DistributedTestCase {
public boolean done() {
return service.isLockGrantor();
}
+
@Override
public String description() {
return "waiting to become lock grantor after shutting down locator/grantor";
}
-
+
}, DistributionConfig.DEFAULT_MEMBER_TIMEOUT * 2, 1000, true);
assertTrue(service.isLockGrantor());
}
});
-
+
properties.put("start-locator", locators);
properties.put("log-level", getDUnitLogLevel());
- system = (InternalDistributedSystem)DistributedSystem.connect(properties);
+ system = (InternalDistributedSystem) DistributedSystem.connect(properties);
System.out.println("done connecting distributed system");
-
+
assertEquals("should be the coordinator", system.getDistributedMember(), MembershipManagerHelper.getCoordinator(system));
NetView view = MembershipManagerHelper.getMembershipManager(system).getView();
getLogWriter().info("view after becoming coordinator is " + view);
- assertNotSame("should not be the first member in the view ("+view+")", system.getDistributedMember(), view.get(0));
-
+ assertNotSame("should not be the first member in the view (" + view + ")", system.getDistributedMember(), view.get(0));
+
service = DistributedLockService.create("test service", system);
// now force a non-elder VM to get a lock. This will hang if the bug is not fixed
@@ -220,7 +207,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
});
assertFalse("should not have become lock grantor", service.isLockGrantor());
-
+
// Now demonstrate that a new member can join and use the lock service
properties.remove("start-locator");
vm3.invoke(startSystem);
@@ -230,7 +217,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
service.lock("foo5", 0, 0);
}
});
-
+
} finally {
disconnectAllFromDS();
}
@@ -241,7 +228,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
* split-brain configuration. To work around this we have always told customers that they
* need to stagger the starting of locators. This test configures two locators to start up
* simultaneously and shows that they find each other and form a single system.
- *
+ *
* @throws Exception
*/
public void testStartTwoLocators() throws Exception {
@@ -249,16 +236,16 @@ public class LocatorDUnitTest extends DistributedTestCase {
Host host = Host.getHost(0);
VM loc1 = host.getVM(1);
VM loc2 = host.getVM(2);
-
+
final int port1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
this.port1 = port1;
final int port2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
this.port2 = port2; // for cleanup in tearDown2
deleteLocatorStateFile(port1);
deleteLocatorStateFile(port2);
- final String host0 = getServerHostName(host);
+ final String host0 = getServerHostName(host);
final String locators = host0 + "[" + port1 + "]," +
- host0 + "[" + port2 + "]";
+ host0 + "[" + port2 + "]";
final Properties properties = new Properties();
properties.put("mcast-port", "0");
properties.put("locators", locators);
@@ -315,11 +302,11 @@ public class LocatorDUnitTest extends DistributedTestCase {
async2.join();
Object result1 = async1.getReturnValue();
if (result1 instanceof Exception) {
- throw (Exception)result1;
+ throw (Exception) result1;
}
Object result2 = async2.getReturnValue();
if (result2 instanceof Exception) {
- throw (Exception)result2;
+ throw (Exception) result2;
}
// verify that they found each other
SerializableCallable verify = new SerializableCallable("verify no split-brain") {
@@ -330,7 +317,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
}
Assert.assertTrue(sys.getDM().getViewMembers().size() == 2,
"expected 2 members but found " + sys.getDM().getViewMembers().size()
- );
+ );
return true;
}
};
@@ -351,8 +338,9 @@ public class LocatorDUnitTest extends DistributedTestCase {
loc1.invoke(r);
}
}
-
+
}
+
/**
* test lead member selection
*/
@@ -362,7 +350,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
-
+
port1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
deleteLocatorStateFile(port1);
final String locators = getServerHostName(host) + "[" + port1 + "]";
@@ -371,69 +359,69 @@ public class LocatorDUnitTest extends DistributedTestCase {
properties.put("locators", locators);
properties.put("enable-network-partition-detection", "true");
properties.put("disable-auto-reconnect", "true");
-
+
File logFile = new File("");
if (logFile.exists()) {
logFile.delete();
}
Locator locator = Locator.startLocatorAndDS(port1, logFile, properties);
try {
- DistributedSystem sys = locator.getDistributedSystem();
-
- Object[] connectArgs = new Object[]{ properties };
-
- SerializableRunnable disconnect =
- new SerializableRunnable("Disconnect from " + locators) {
- public void run() {
- DistributedSystem sys = InternalDistributedSystem.getAnyInstance();
- if (sys != null && sys.isConnected()) {
- sys.disconnect();
+ DistributedSystem sys = locator.getDistributedSystem();
+
+ Object[] connectArgs = new Object[] { properties };
+
+ SerializableRunnable disconnect =
+ new SerializableRunnable("Disconnect from " + locators) {
+ public void run() {
+ DistributedSystem sys = InternalDistributedSystem.getAnyInstance();
+ if (sys != null && sys.isConnected()) {
+ sys.disconnect();
+ }
}
- }
- };
-
- assertTrue(MembershipManagerHelper.getLeadMember(sys) == null);
-
- // connect three vms and then watch the lead member selection as they
- // are disconnected/reconnected
- properties.put("name", "vm1");
- DistributedMember mem1 = (DistributedMember)vm1.invoke(this.getClass(),
- "getDistributedMember", connectArgs);
-
-// assertTrue(MembershipManagerHelper.getLeadMember(sys) != null);
- assertLeadMember(mem1, sys, 5000);
-
- properties.put("name", "vm2");
- DistributedMember mem2 = (DistributedMember)vm2.invoke(this.getClass(),
- "getDistributedMember", connectArgs);
- assertLeadMember(mem1, sys, 5000);
-
- properties.put("name", "vm3");
- DistributedMember mem3 = (DistributedMember)vm3.invoke(this.getClass(),
- "getDistributedMember", connectArgs);
- assertLeadMember(mem1, sys, 5000);
-
- // after disconnecting the first vm, the second one should become the leader
- vm1.invoke(disconnect);
- MembershipManagerHelper.getMembershipManager(sys).waitForDeparture(mem1);
- assertLeadMember(mem2, sys, 5000);
-
- properties.put("name", "vm1");
- mem1 = (DistributedMember)vm1.invoke(this.getClass(),
- "getDistributedMember", connectArgs);
- assertLeadMember(mem2, sys, 5000);
-
- vm2.invoke(disconnect);
- MembershipManagerHelper.getMembershipManager(sys).waitForDeparture(mem2);
- assertLeadMember(mem3, sys, 5000);
-
- vm1.invoke(disconnect);
- MembershipManagerHelper.getMembershipManager(sys).waitForDeparture(mem1);
- assertLeadMember(mem3, sys, 5000);
-
- vm3.invoke(disconnect);
- MembershipManagerHelper.getMembershipManager(sys).waitForDeparture(mem3);
- assertLeadMember(null, sys, 5000);
+ };
+
+ assertTrue(MembershipManagerHelper.getLeadMember(sys) == null);
+
+ // connect three vms and then watch the lead member selection as they
+ // are disconnected/reconnected
+ properties.put("name", "vm1");
+ DistributedMember mem1 = (DistributedMember) vm1.invoke(this.getClass(),
+ "getDistributedMember", connectArgs);
+
+ // assertTrue(MembershipManagerHelper.getLeadMember(sys) != null);
+ assertLeadMember(mem1, sys, 5000);
+
+ properties.put("name", "vm2");
+ DistributedMember mem2 = (DistributedMember) vm2.invoke(this.getClass(),
+ "getDistributedMember", connectArgs);
+ assertLeadMember(mem1, sys, 5000);
+
+ properties.put("name", "vm3");
+ DistributedMember mem3 = (DistributedMember) vm3.invoke(this.getClass(),
+ "getDistributedMember", connectArgs);
+ assertLeadMember(mem1, sys, 5000);
+
+ // after disconnecting the first vm, the second one should become the leader
+ vm1.invoke(disconnect);
+ MembershipManagerHelper.getMembershipManager(sys).waitForDeparture(mem1);
+ assertLeadMember(mem2, sys, 5000);
+
+ properties.put("name", "vm1");
+ mem1 = (DistributedMember) vm1.invoke(this.getClass(),
+ "getDistributedMember", connectArgs);
+ assertLeadMember(mem2, sys, 5000);
+
+ vm2.invoke(disconnect);
+ MembershipManagerHelper.getMembershipManager(sys).waitForDeparture(mem2);
+ assertLeadMember(mem3, sys, 5000);
+
+ vm1.invoke(disconnect);
+ MembershipManagerHelper.getMembershipManager(sys).waitForDeparture(mem1);
+ assertLeadMember(mem3, sys, 5000);
+
+ vm3.invoke(disconnect);
+ MembershipManagerHelper.getMembershipManager(sys).waitForDeparture(mem3);
+ assertLeadMember(null, sys, 5000);
} finally {
locator.stop();
@@ -450,13 +438,14 @@ public class LocatorDUnitTest extends DistributedTestCase {
}
return (lead == null);
}
+
public String description() {
return null;
}
};
DistributedTestCase.waitForCriterion(ev, timeout, 200, true);
}
-
+
/**
* test lead member and coordinator failure with network partition detection
* enabled. It would be nice for this test to have more than two "server"
@@ -482,15 +471,15 @@ public class LocatorDUnitTest extends DistributedTestCase {
VM vm2 = host.getVM(2);
VM locvm = host.getVM(3);
Locator locator = null;
-
+
final int port1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
this.port1 = port1;
final int port2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
this.port2 = port2; // for cleanup in tearDown2()
deleteLocatorStateFile(port1, port2);
- final String host0 = getServerHostName(host);
+ final String host0 = getServerHostName(host);
final String locators = host0 + "[" + port1 + "]," +
- host0 + "[" + port2 + "]";
+ host0 + "[" + port2 + "]";
final Properties properties = new Properties();
properties.put("mcast-port", "0");
properties.put("locators", locators);
@@ -498,9 +487,9 @@ public class LocatorDUnitTest extends DistributedTestCase {
properties.put("disable-auto-reconnect", "true");
properties.put("member-timeout", "2000");
properties.put("log-level", getDUnitLogLevel());
-// properties.put("log-level", "fine");
+ // properties.put("log-level", "fine");
properties.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-
+
try {
final String uname = getUniqueName();
File logFile = new File("");
@@ -513,50 +502,48 @@ public class LocatorDUnitTest extends DistributedTestCase {
File lf = new File("");
try {
Locator.startLocatorAndDS(port2, lf, properties);
- }
- catch (IOException ios) {
+ } catch (IOException ios) {
fail("Unable to start locator2", ios);
}
}
});
-
- Object[] connectArgs = new Object[]{ properties };
-
- SerializableRunnable crashLocator =
- new SerializableRunnable("Crash locator") {
- public void run() {
- Locator loc = Locator.getLocators().iterator().next();
- DistributedSystem msys = loc.getDistributedSystem();
- MembershipManagerHelper.crashDistributedSystem(msys);
- loc.stop();
- }
- };
+ Object[] connectArgs = new Object[] { properties };
+
+ SerializableRunnable crashLocator =
+ new SerializableRunnable("Crash locator") {
+ public void run() {
+ Locator loc = Locator.getLocators().iterator().next();
+ DistributedSystem msys = loc.getDistributedSystem();
+ MembershipManagerHelper.crashDistributedSystem(msys);
+ loc.stop();
+ }
+ };
assertTrue(MembershipManagerHelper.getLeadMember(sys) == null);
-
-// properties.put("log-level", getDUnitLogLevel());
-
- DistributedMember mem1 = (DistributedMember)vm1.invoke(this.getClass(),
+
+ // properties.put("log-level", getDUnitLogLevel());
+
+ DistributedMember mem1 = (DistributedMember) vm1.invoke(this.getClass(),
"getDistributedMember", connectArgs);
vm2.invoke(this.getClass(),
"getDistributedMember", connectArgs);
assertLeadMember(mem1, sys, 5000);
-
+
assertEquals(sys.getDistributedMember(), MembershipManagerHelper.getCoordinator(sys));
-
+
// crash the second vm and the locator. Should be okay
crashDistributedSystem(vm2);
locvm.invoke(crashLocator);
-
+
assertTrue("Distributed system should not have disconnected",
vm1.invokeBoolean(LocatorDUnitTest.class, "isSystemConnected"));
-
+
// ensure quorumLost is properly invoked
- DistributionManager dm = (DistributionManager)((InternalDistributedSystem)sys).getDistributionManager();
+ DistributionManager dm = (DistributionManager) ((InternalDistributedSystem) sys).getDistributionManager();
MyMembershipListener listener = new MyMembershipListener();
dm.addMembershipListener(listener);
-
+
// disconnect the first vm and demonstrate that the third vm and the
// locator notice the failure and exit
crashDistributedSystem(vm1);
@@ -565,12 +552,13 @@ public class LocatorDUnitTest extends DistributedTestCase {
* It will take 3 * (3 * member-timeout) milliseconds to detect the full
* failure and eject the lost members from the view.
*/
-
+
getLogWriter().info("waiting for my distributed system to disconnect due to partition detection");
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
return !sys.isConnected();
}
+
public String description() {
return null;
}
@@ -582,20 +570,18 @@ public class LocatorDUnitTest extends DistributedTestCase {
// quorumLost should be invoked if we get a ForcedDisconnect in this situation
assertTrue("expected quorumLost to be invoked", listener.quorumLostInvoked);
assertTrue("expected suspect processing initiated by TCPConduit", listener.suspectReasons.contains(Connection.INITIATING_SUSPECT_PROCESSING));
- }
- finally {
+ } finally {
if (locator != null) {
locator.stop();
}
LogWriter bLogger =
- new LocalLogWriter(InternalLogWriter.ALL_LEVEL, System.out);
+ new LocalLogWriter(InternalLogWriter.ALL_LEVEL, System.out);
bLogger.info("<ExpectedException action=remove>service failure</ExpectedException>");
bLogger.info("<ExpectedException action=remove>java.net.ConnectException</ExpectedException>");
bLogger.info("<ExpectedException action=remove>com.gemstone.gemfire.ForcedDisconnectException</ExpectedException>");
disconnectAllFromDS();
}
}
-
/**
* test lead member failure and normal coordinator shutdown with network partition detection
@@ -617,7 +603,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
VM vm2 = host.getVM(2);
VM locvm = host.getVM(3);
Locator locator = null;
-
+
final int port1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
this.port1 = port1;
final int port2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
@@ -625,7 +611,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
deleteLocatorStateFile(port1, port2);
final String host0 = getServerHostName(host);
final String locators = host0 + "[" + port1 + "],"
- + host0 + "[" + port2 + "]";
+ + host0 + "[" + port2 + "]";
final Properties properties = new Properties();
properties.put("mcast-port", "0");
properties.put("locators", locators);
@@ -635,7 +621,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
properties.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
SerializableRunnable stopLocator = getStopLocatorRunnable();
-
+
try {
final String uname = getUniqueName();
File logFile = new File("");
@@ -647,75 +633,74 @@ public class LocatorDUnitTest extends DistributedTestCase {
try {
Locator.startLocatorAndDS(port2, lf, properties);
MembershipManagerHelper.inhibitForcedDisconnectLogging(true);
- }
- catch (IOException ios) {
+ } catch (IOException ios) {
fail("Unable to start locator2", ios);
}
}
});
-
- Object[] connectArgs = new Object[]{ properties };
-
+
+ Object[] connectArgs = new Object[] { properties };
+
SerializableRunnable crashSystem =
- new SerializableRunnable("Crash system") {
- public void run() {
- DistributedSystem msys = InternalDistributedSystem.getAnyInstance();
- msys.getLogWriter().info("<ExpectedException action=add>service failure</ExpectedException>");
- msys.getLogWriter().info("<ExpectedException action=add>com.gemstone.gemfire.ConnectException</ExpectedException>");
- msys.getLogWriter().info("<ExpectedException action=add>com.gemstone.gemfire.ForcedDisconnectException</ExpectedException>");
- MembershipManagerHelper.crashDistributedSystem(msys);
- }
- };
+ new SerializableRunnable("Crash system") {
+ public void run() {
+ DistributedSystem msys = InternalDistributedSystem.getAnyInstance();
+ msys.getLogWriter().info("<ExpectedException action=add>service failure</ExpectedException>");
+ msys.getLogWriter().info("<ExpectedException action=add>com.gemstone.gemfire.ConnectException</ExpectedException>");
+ msys.getLogWriter().info("<ExpectedException action=add>com.gemstone.gemfire.ForcedDisconnectException</ExpectedException>");
+ MembershipManagerHelper.crashDistributedSystem(msys);
+ }
+ };
assertTrue(MembershipManagerHelper.getLeadMember(sys) == null);
-
- DistributedMember mem1 = (DistributedMember)vm1.invoke(this.getClass(),
+
+ DistributedMember mem1 = (DistributedMember) vm1.invoke(this.getClass(),
"getDistributedMember", connectArgs);
- DistributedMember mem2 = (DistributedMember)vm2.invoke(this.getClass(),
+ DistributedMember mem2 = (DistributedMember) vm2.invoke(this.getClass(),
"getDistributedMember", connectArgs);
assertEquals(mem1, MembershipManagerHelper.getLeadMember(sys));
-
+
assertEquals(sys.getDistributedMember(), MembershipManagerHelper.getCoordinator(sys));
-
+
MembershipManagerHelper.inhibitForcedDisconnectLogging(true);
// crash the lead vm. Should be okay
vm1.invoke(crashSystem);
pause(4 * 2000); // 4 x the member-timeout
-
+
assertTrue("Distributed system should not have disconnected",
isSystemConnected());
-
+
assertTrue("Distributed system should not have disconnected",
vm2.invokeBoolean(LocatorDUnitTest.class, "isSystemConnected"));
-
+
assertTrue("Distributed system should not have disconnected",
locvm.invokeBoolean(LocatorDUnitTest.class, "isSystemConnected"));
// stop the locator normally. This should also be okay
locator.stop();
-
+
if (!Locator.getLocators().isEmpty()) {
// log this for debugging purposes before throwing assertion error
getLogWriter().warning("found locator " + Locator.getLocators().iterator().next());
}
assertTrue("locator is not stopped", Locator.getLocators().isEmpty());
-
+
assertTrue("Distributed system should not have disconnected",
vm2.invokeBoolean(LocatorDUnitTest.class, "isSystemConnected"));
-
+
assertTrue("Distributed system should not have disconnected",
locvm.invokeBoolean(LocatorDUnitTest.class, "isSystemConnected"));
// the remaining non-locator member should now be the lead member
assertEquals("This test sometimes fails. If the log contains " +
- "'failed to collect all ACKs' it is a false failure.",
- mem2, vm2.invoke(LocatorDUnitTest.class, "getLeadMember", new Object[]{}));
-
+ "'failed to collect all ACKs' it is a false failure.",
+ mem2, vm2.invoke(LocatorDUnitTest.class, "getLeadMember", new Object[] {}));
+
SerializableRunnable disconnect =
- new SerializableRunnable("Disconnect from " + locators) {
+ new SerializableRunnable("Disconnect from " + locators) {
public void run() {
DistributedSystem sys = InternalDistributedSystem.getAnyInstance();
if (sys != null && sys.isConnected()) {
@@ -728,8 +713,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
// locator notice the failure and exit
vm2.invoke(disconnect);
locvm.invoke(stopLocator);
- }
- finally {
+ } finally {
MembershipManagerHelper.inhibitForcedDisconnectLogging(false);
if (locator != null) {
locator.stop();
@@ -764,7 +748,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
VM vm2 = host.getVM(2);
VM locvm = host.getVM(3);
Locator locator = null;
-
+
final int port1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
this.port1 = port1;
final int port2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
@@ -783,7 +767,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
properties.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
SerializableRunnable stopLocator = getStopLocatorRunnable();
-
+
try {
final String uname = getUniqueName();
File logFile = new File("");
@@ -794,69 +778,68 @@ public class LocatorDUnitTest extends DistributedTestCase {
File lf = new File("");
try {
Locator.startLocatorAndDS(port2, lf, properties);
- }
- catch (IOException ios) {
+ } catch (IOException ios) {
fail("Unable to start locator2", ios);
}
}
});
-
- Object[] connectArgs = new Object[]{ properties };
-
+
+ Object[] connectArgs = new Object[] { properties };
+
SerializableRunnable crashSystem =
- new SerializableRunnable("Crash system") {
- public void run() {
- DistributedSystem msys = InternalDistributedSystem.getAnyInstance();
- msys.getLogWriter().info("<ExpectedException action=add>service failure</ExpectedException>");
- msys.getLogWriter().info("<ExpectedException action=add>com.gemstone.gemfire.ConnectException</ExpectedException>");
- msys.getLogWriter().info("<ExpectedException action=add>com.gemstone.gemfire.ForcedDisconnectException</ExpectedException>");
- msys.getLogWriter().info("<ExpectedException action=add>Possible loss of quorum</ExpectedException>");
- hook = new TestHook();
- MembershipManagerHelper.getMembershipManager(msys).registerTestHook(hook);
- try {
- MembershipManagerHelper.crashDistributedSystem(msys);
- } finally {
- hook.reset();
+ new SerializableRunnable("Crash system") {
+ public void run() {
+ DistributedSystem msys = InternalDistributedSystem.getAnyInstance();
+ msys.getLogWriter().info("<ExpectedException action=add>service failure</ExpectedException>");
+ msys.getLogWriter().info("<ExpectedException action=add>com.gemstone.gemfire.ConnectException</ExpectedException>");
+ msys.getLogWriter().info("<ExpectedException action=add>com.gemstone.gemfire.ForcedDisconnectException</ExpectedException>");
+ msys.getLogWriter().info("<ExpectedException action=add>Possible loss of quorum</ExpectedException>");
+ hook = new TestHook();
+ MembershipManagerHelper.getMembershipManager(msys).registerTestHook(hook);
+ try {
+ MembershipManagerHelper.crashDistributedSystem(msys);
+ } finally {
+ hook.reset();
+ }
}
- }
- };
+ };
assertTrue(MembershipManagerHelper.getLeadMember(sys) == null);
-
- final DistributedMember mem1 = (DistributedMember)vm1.invoke(this.getClass(),
+
+ final DistributedMember mem1 = (DistributedMember) vm1.invoke(this.getClass(),
"getDistributedMember", connectArgs);
- final DistributedMember mem2 = (DistributedMember)vm2.invoke(this.getClass(),
+ final DistributedMember mem2 = (DistributedMember) vm2.invoke(this.getClass(),
"getDistributedMember", connectArgs);
assertEquals(mem1, MembershipManagerHelper.getLeadMember(sys));
-
+
assertEquals(sys.getDistributedMember(), MembershipManagerHelper.getCoordinator(sys));
-
+
// crash the lead vm. Should be okay. it should hang in test hook thats
// why call is asynchronous.
//vm1.invokeAsync(crashSystem);
assertTrue("Distributed system should not have disconnected",
isSystemConnected());
-
+
assertTrue("Distributed system should not have disconnected",
vm2.invokeBoolean(LocatorDUnitTest.class, "isSystemConnected"));
-
+
assertTrue("Distributed system should not have disconnected",
locvm.invokeBoolean(LocatorDUnitTest.class, "isSystemConnected"));
vm2.invokeAsync(crashSystem);
pause(1000); // 4 x the member-timeout
-
+
// request member removal for first peer from second peer.
vm2.invoke(new SerializableRunnable("Request Member Removal") {
-
+
@Override
public void run() {
DistributedSystem msys = InternalDistributedSystem.getAnyInstance();
MembershipManager mmgr = MembershipManagerHelper.getMembershipManager(msys);
-
+
// check for shutdown cause in MembershipManager. Following call should
// throw DistributedSystemDisconnectedException which should have cause as
// ForceDisconnectException.
@@ -876,8 +859,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
}
});
- }
- finally {
+ } finally {
if (locator != null) {
locator.stop();
}
@@ -903,7 +885,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
VM vm2 = host.getVM(2);
VM locvm = host.getVM(3);
Locator locator = null;
-
+
final int port1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
this.port1 = port1;
final int port2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
@@ -911,7 +893,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
deleteLocatorStateFile(port1, port2);
final String host0 = getServerHostName(host);
final String locators = host0 + "[" + port1 + "],"
- + host0 + "[" + port2 + "]";
+ + host0 + "[" + port2 + "]";
final Properties properties = new Properties();
properties.put("mcast-port", "0");
properties.put("locators", locators);
@@ -921,7 +903,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
properties.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
SerializableRunnable disconnect =
- new SerializableRunnable("Disconnect from " + locators) {
+ new SerializableRunnable("Disconnect from " + locators) {
public void run() {
DistributedSystem sys = InternalDistributedSystem.getAnyInstance();
if (sys != null && sys.isConnected()) {
@@ -931,11 +913,11 @@ public class LocatorDUnitTest extends DistributedTestCase {
}
};
SerializableRunnable expectedException =
- new SerializableRunnable("Add expected exceptions") {
- public void run() {
- MembershipManagerHelper.inhibitForcedDisconnectLogging(true);
- }
- };
+ new SerializableRunnable("Add expected exceptions") {
+ public void run() {
+ MembershipManagerHelper.inhibitForcedDisconnectLogging(true);
+ }
+ };
try {
final String uname = getUniqueName();
locvm.invoke(new SerializableRunnable() {
@@ -943,8 +925,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
File lf = new File("");
try {
Locator.startLocatorAndDS(port2, lf, properties);
- }
- catch (IOException ios) {
+ } catch (IOException ios) {
fail("Unable to start locator1", ios);
}
}
@@ -954,46 +935,46 @@ public class LocatorDUnitTest extends DistributedTestCase {
locator = Locator.startLocatorAndDS(port1, logFile, properties);
DistributedSystem sys = locator.getDistributedSystem();
sys.getLogWriter().info("<ExpectedException action=add>com.gemstone.gemfire.ForcedDisconnectException</ExpectedException>");
- Object[] connectArgs = new Object[]{ properties };
-
+ Object[] connectArgs = new Object[] { properties };
+
SerializableRunnable crashLocator =
- new SerializableRunnable("Crash locator") {
- public void run() {
- Locator loc = Locator.getLocators().iterator().next();
- DistributedSystem msys = loc.getDistributedSystem();
- msys.getLogWriter().info("<ExpectedException action=add>service failure</ExpectedException>");
- msys.getLogWriter().info("<ExpectedException action=add>com.gemstone.gemfire.ForcedDisconnectException</ExpectedException>");
- msys.getLogWriter().info("<ExpectedException action=add>com.gemstone.gemfire.ConnectException</ExpectedException>");
- MembershipManagerHelper.crashDistributedSystem(msys);
- loc.stop();
- }
- };
+ new SerializableRunnable("Crash locator") {
+ public void run() {
+ Locator loc = Locator.getLocators().iterator().next();
+ DistributedSystem msys = loc.getDistributedSystem();
+ msys.getLogWriter().info("<ExpectedException action=add>service failure</ExpectedException>");
+ msys.getLogWriter().info("<ExpectedException action=add>com.gemstone.gemfire.ForcedDisconnectException</ExpectedException>");
+ msys.getLogWriter().info("<ExpectedException action=add>com.gemstone.gemfire.ConnectException</ExpectedException>");
+ MembershipManagerHelper.crashDistributedSystem(msys);
+ loc.stop();
+ }
+ };
assertTrue(MembershipManagerHelper.getLeadMember(sys) == null);
-
- DistributedMember mem1 = (DistributedMember)vm1.invoke(this.getClass(),
+
+ DistributedMember mem1 = (DistributedMember) vm1.invoke(this.getClass(),
"getDistributedMember", connectArgs);
vm1.invoke(expectedException);
- DistributedMember mem2 = (DistributedMember)vm2.invoke(this.getClass(),
+ DistributedMember mem2 = (DistributedMember) vm2.invoke(this.getClass(),
"getDistributedMember", connectArgs);
-
- DistributedMember loc1Mbr = (DistributedMember)locvm.invoke(this.getClass(),
- "getLocatorDistributedMember", new Object[]{});
+
+ DistributedMember loc1Mbr = (DistributedMember) locvm.invoke(this.getClass(),
+ "getLocatorDistributedMember", new Object[] {});
assertLeadMember(mem1, sys, 5000);
-
+
assertEquals(loc1Mbr, MembershipManagerHelper.getCoordinator(sys));
-
+
// crash the lead locator. Should be okay
locvm.invoke(crashLocator);
pause(10 * 1000);
assertTrue("Distributed system should not have disconnected",
sys.isConnected());
-
+
assertTrue("Distributed system should not have disconnected",
vm1.invokeBoolean(LocatorDUnitTest.class, "isSystemConnected"));
-
+
assertTrue("Distributed system should not have disconnected",
vm2.invokeBoolean(LocatorDUnitTest.class, "isSystemConnected"));
@@ -1001,18 +982,17 @@ public class LocatorDUnitTest extends DistributedTestCase {
// locator notice the failure and continue to run
vm1.invoke(disconnect);
pause(10 * 1000);
-
+
assertTrue("Distributed system should not have disconnected",
vm2.invokeBoolean(LocatorDUnitTest.class, "isSystemConnected"));
-
+
assertEquals(sys.getDistributedMember(),
MembershipManagerHelper.getCoordinator(sys));
assertEquals(mem2, MembershipManagerHelper.getLeadMember(sys));
-
- }
- finally {
+
+ } finally {
vm2.invoke(disconnect);
-
+
if (locator != null) {
locator.stop();
}
@@ -1028,7 +1008,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
disconnectAllFromDS();
Host host = Host.getHost(0);
int port =
- AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+ AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
deleteLocatorStateFile(port1);
String locators = getServerHostName(host) + "[" + port + "]";
Properties props = new Properties();
@@ -1036,17 +1016,17 @@ public class LocatorDUnitTest extends DistributedTestCase {
props.setProperty("locators", locators);
final String expected = "java.net.ConnectException";
- final String addExpected =
- "<ExpectedException action=add>" + expected + "</ExpectedException>";
- final String removeExpected =
- "<ExpectedException action=remove>" + expected + "</ExpectedException>";
-
+ final String addExpected =
+ "<ExpectedException action=add>" + expected + "</ExpectedException>";
+ final String removeExpected =
+ "<ExpectedException action=remove>" + expected + "</ExpectedException>";
+
LogWriter bgexecLogger =
- new LocalLogWriter(InternalLogWriter.ALL_LEVEL, System.out);
+ new LocalLogWriter(InternalLogWriter.ALL_LEVEL, System.out);
bgexecLogger.info(addExpected);
-
+
boolean exceptionOccurred = true;
- String oldValue = (String)System.getProperties().put("p2p.joinTimeout", "15000");
+ String oldValue = (String) System.getProperties().put("p2p.joinTimeout", "15000");
try {
DistributedSystem.connect(props);
exceptionOccurred = false;
@@ -1057,13 +1037,12 @@ public class LocatorDUnitTest extends DistributedTestCase {
} catch (GemFireConfigException ex) {
String s = ex.getMessage();
assertTrue(s.indexOf("Locator does not exist") >= 0);
-
+
} catch (Exception ex) {
// if you see this fail, determine if unexpected exception is expected
// if expected then add in a catch block for it above this catch
fail("Failed with unexpected exception", ex);
- }
- finally {
+ } finally {
if (oldValue == null) {
System.getProperties().remove("p2p.joinTimeout");
} else {
@@ -1091,111 +1070,112 @@ public class LocatorDUnitTest extends DistributedTestCase {
VM vm2 = host.getVM(2);
final int port =
- AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+ AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
deleteLocatorStateFile(port1);
final String locators = getServerHostName(host) + "[" + port + "]";
final String uniqueName = getUniqueName();
-
+
vm0.invoke(new SerializableRunnable("Start locator " + locators) {
- public void run() {
- File logFile = new File("");
- try {
- Properties locProps = new Properties();
- locProps.setProperty("mcast-port", "0");
- locProps.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+ public void run() {
+ File logFile = new File("");
+ try {
+ Properties locProps = new Properties();
+ locProps.setProperty("mcast-port", "0");
+ locProps.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
- Locator.startLocatorAndDS(port, logFile, locProps);
- } catch (IOException ex) {
- fail("While starting locator on port " + port, ex);
- }
+ Locator.startLocatorAndDS(port, logFile, locProps);
+ } catch (IOException ex) {
+ fail("While starting locator on port " + port, ex);
}
- });
+ }
+ });
try {
- SerializableRunnable connect =
- new SerializableRunnable("Connect to " + locators) {
- public void run() {
- //System.setProperty("p2p.joinTimeout", "5000");
- Properties props = new Properties();
- props.setProperty("mcast-port", "0");
- props.setProperty("locators", locators);
- DistributedSystem.connect(props);
- }
- };
- vm1.invoke(connect);
- vm2.invoke(connect);
-
- Properties props = new Properties();
- props.setProperty("mcast-port", "0");
- props.setProperty("locators", locators);
-
- system = (InternalDistributedSystem)DistributedSystem.connect(props);
-
- final DistributedMember coord = MembershipManagerHelper.getCoordinator(system);
- getLogWriter().info("coordinator before termination of locator is " + coord);
+ SerializableRunnable connect =
+ new SerializableRunnable("Connect to " + locators) {
+ public void run() {
+ //System.setProperty("p2p.joinTimeout", "5000");
+ Properties props = new Properties();
+ props.setProperty("mcast-port", "0");
+ props.setProperty("locators", locators);
+ DistributedSystem.connect(props);
+ }
+ };
+ vm1.invoke(connect);
+ vm2.invoke(connect);
- vm0.invoke(getStopLocatorRunnable());
-
- // now ensure that one of the remaining members became the coordinator
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return !coord.equals(MembershipManagerHelper.getCoordinator(system));
- }
- public String description() {
- return "expected the coordinator to be " + coord + " but it is " +
- MembershipManagerHelper.getCoordinator(system);
- }
- };
- DistributedTestCase.waitForCriterion(ev, 15 * 1000, 200, true);
- DistributedMember newCoord = MembershipManagerHelper.getCoordinator(system);
- getLogWriter().info("coordinator after shutdown of locator was " +
- newCoord);
- if (coord.equals(newCoord)) {
- fail("another member should have become coordinator after the locator was stopped");
- }
-
+ Properties props = new Properties();
+ props.setProperty("mcast-port", "0");
+ props.setProperty("locators", locators);
- system.disconnect();
+ system = (InternalDistributedSystem) DistributedSystem.connect(props);
- SerializableRunnable disconnect =
- new SerializableRunnable("Disconnect from " + locators) {
- public void run() {
- DistributedSystem sys = InternalDistributedSystem.getAnyInstance();
- if (sys != null && sys.isConnected()) {
- sys.disconnect();
- }
- }
- };
- vm1.invoke(disconnect);
- vm2.invoke(disconnect);
+ final DistributedMember coord = MembershipManagerHelper.getCoordinator(system);
+ getLogWriter().info("coordinator before termination of locator is " + coord);
- } finally {
vm0.invoke(getStopLocatorRunnable());
- }
- }
-// public void testRepeat() throws Exception {
-// for (int i=0; i<10; i++) {
-// System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> run #"+i);
-// testLocatorBecomesCoordinator();
-// tearDown();
-// setUp();
-// }
-// }
- /**
- * Tests starting one locator in a remote VM and having multiple
- * members of the distributed system join it. This ensures that
- * members start up okay, and that handling of a stopped locator
- * is correct. It then restarts the locator to demonstrate that
- * it can connect to and function as the group coordinator
+ // now ensure that one of the remaining members became the coordinator
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ return !coord.equals(MembershipManagerHelper.getCoordinator(system));
+ }
+
+ public String description() {
+ return "expected the coordinator to be " + coord + " but it is " +
+ MembershipManagerHelper.getCoordinator(system);
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 15 * 1000, 200, true);
+ DistributedMember newCoord = MembershipManagerHelper.getCoordinator(system);
+ getLogWriter().info("coordinator after shutdown of locator was " +
+ newCoord);
+ if (coord.equals(newCoord)) {
+ fail("another member should have become coordinator after the locator was stopped");
+ }
+
+ system.disconnect();
+
+ SerializableRunnable disconnect =
+ new SerializableRunnable("Disconnect from " + locators) {
+ public void run() {
+ DistributedSystem sys = InternalDistributedSystem.getAnyInstance();
+ if (sys != null && sys.isConnected()) {
+ sys.disconnect();
+ }
+ }
+ };
+ vm1.invoke(disconnect);
+ vm2.invoke(disconnect);
+
+ } finally {
+ vm0.invoke(getStopLocatorRunnable());
+ }
+ }
+
+ // public void testRepeat() throws Exception {
+ // for (int i=0; i<10; i++) {
+ // System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> run #"+i);
+ // testLocatorBecomesCoordinator();
+ // tearDown();
+ // setUp();
+ // }
+ // }
+
+ /**
+ * Tests starting one locator in a remote VM and having multiple
+ * members of the distributed system join it. This ensures that
+ * members start up okay, and that handling of a stopped locator
+ * is correct. It then restarts the locator to demonstrate that
+ * it can connect to and function as the group coordinator
*/
public void testLocatorBecomesCoordinator() throws Exception {
disconnectAllFromDS();
final String expected = "java.net.ConnectException";
- final String addExpected =
- "<ExpectedException action=add>" + expected + "</ExpectedException>";
- final String removeExpected =
- "<ExpectedException action=remove>" + expected + "</ExpectedException>";
+ final String addExpected =
+ "<ExpectedException action=add>" + expected + "</ExpectedException>";
+ final String removeExpected =
+ "<ExpectedException action=remove>" + expected + "</ExpectedException>";
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
@@ -1203,111 +1183,115 @@ public class LocatorDUnitTest extends DistributedTestCase {
VM vm2 = host.getVM(2);
final int port =
- AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+ AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
deleteLocatorStateFile(port1);
final String locators = getServerHostName(host) + "[" + port + "]";
-
- vm0.invoke(getStartSBLocatorRunnable(port, getUniqueName()+"1"));
+
+ vm0.invoke(getStartSBLocatorRunnable(port, getUniqueName() + "1"));
try {
- final Properties props = new Properties();
- props.setProperty("mcast-port", "0");
- props.setProperty("locators", locators);
- props.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+ final Properties props = new Properties();
+ props.setProperty("mcast-port", "0");
+ props.setProperty("locators", locators);
+ props.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
- SerializableRunnable connect =
- new SerializableRunnable("Connect to " + locators) {
- public void run() {
- //System.setProperty("p2p.joinTimeout", "5000");
- DistributedSystem sys = getSystem(props);
- sys.getLogWriter().info(addExpected);
- }
- };
- vm1.invoke(connect);
- vm2.invoke(connect);
+ SerializableRunnable connect =
+ new SerializableRunnable("Connect to " + locators) {
+ public void run() {
+ //System.setProperty("p2p.joinTimeout", "5000");
+ DistributedSystem sys = getSystem(props);
+ sys.getLogWriter().info(addExpected);
+ }
+ };
+ vm1.invoke(connect);
+ vm2.invoke(connect);
- system = (InternalDistributedSystem)getSystem(props);
+ system = (InternalDistributedSystem) getSystem(props);
- final DistributedMember coord = MembershipManagerHelper.getCoordinator(system);
- getLogWriter().info("coordinator before termination of locator is " + coord);
+ final DistributedMember coord = MembershipManagerHelper.getCoordinator(system);
+ getLogWriter().info("coordinator before termination of locator is " + coord);
- vm0.invoke(getStopLocatorRunnable());
-
- // now ensure that one of the remaining members became the coordinator
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return !coord.equals(MembershipManagerHelper.getCoordinator(system));
- }
- public String description() {
- return "expected the coordinator to be " + coord + " but it is " +
- MembershipManagerHelper.getCoordinator(system);
- }
- };
- DistributedTestCase.waitForCriterion(ev, 15000, 200, true);
- DistributedMember newCoord = MembershipManagerHelper.getCoordinator(system);
- getLogWriter().info("coordinator after shutdown of locator was " +
- newCoord);
- if (newCoord == null || coord.equals(newCoord)) {
- fail("another member should have become coordinator after the locator was stopped: "
- + newCoord);
- }
-
-
- // restart the locator to demonstrate reconnection & make disconnects faster
- // it should also regain the role of coordinator, so we check to make sure
- // that the coordinator has changed
- vm0.invoke(getStartSBLocatorRunnable(port, getUniqueName()+"2"));
-
- final DistributedMember tempCoord = newCoord;
- ev = new WaitCriterion() {
- public boolean done() {
- return !tempCoord.equals(MembershipManagerHelper.getCoordinator(system));
- }
- public String description() {
- return null;
+ vm0.invoke(getStopLocatorRunnable());
+
+ // now ensure that one of the remaining members became the coordinator
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ return !coord.equals(MembershipManagerHelper.getCoordinator(system));
+ }
+
+ public String description() {
+ return "expected the coordinator to be " + coord + " but it is " +
+ MembershipManagerHelper.getCoordinator(system);
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 15000, 200, true);
+ DistributedMember newCoord = MembershipManagerHelper.getCoordinator(system);
+ getLogWriter().info("coordinator after shutdown of locator was " +
+ newCoord);
+ if (newCoord == null || coord.equals(newCoord)) {
+ fail("another member should have become coordinator after the locator was stopped: "
+ + newCoord);
}
- };
- DistributedTestCase.waitForCriterion(ev, 5000, 200, true);
-
- system.disconnect();
- LogWriter bgexecLogger =
- new LocalLogWriter(InternalLogWriter.ALL_LEVEL, System.out);
- bgexecLogger.info(removeExpected);
- SerializableRunnable disconnect =
- new SerializableRunnable("Disconnect from " + locators) {
- public void run() {
- DistributedSystem sys = InternalDistributedSystem.getAnyInstance();
- if (sys != null && sys.isConnected()) {
- sys.disconnect();
+ // restart the locator to demonstrate reconnection & make disconnects faster
+ // it should also regain the role of coordinator, so we check to make sure
+ // that the coordinator has changed
+ vm0.invoke(getStartSBLocatorRunnable(port, getUniqueName() + "2"));
+
+ final DistributedMember tempCoord = newCoord;
+ ev = new WaitCriterion() {
+ public boolean done() {
+ return !tempCoord.equals(MembershipManagerHelper.getCoordinator(system));
+ }
+
+ public String description() {
+ return null;
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 5000, 200, true);
+
+ system.disconnect();
+ LogWriter bgexecLogger =
+ new LocalLogWriter(InternalLogWriter.ALL_LEVEL, System.out);
+ bgexecLogger.info(removeExpected);
+
+ SerializableRunnable disconnect =
+ new SerializableRunnable("Disconnect from " + locators) {
+ public void run() {
+ DistributedSystem sys = InternalDistributedSystem.getAnyInstance();
+ if (sys != null && sys.isConnected()) {
+ sys.disconnect();
+ }
+ // connectExceptions occur during disconnect, so we need the
+ // expectedexception hint to be in effect until this point
+ LogWriter bLogger =
+ new LocalLogWriter(InternalLogWriter.ALL_LEVEL, System.out);
+ bLogger.info(removeExpected);
}
- // connectExceptions occur during disconnect, so we need the
- // expectedexception hint to be in effect until this point
- LogWriter bLogger =
- new LocalLogWriter(InternalLogWriter.ALL_LEVEL, System.out);
- bLogger.info(removeExpected);
- }
- };
- vm1.invoke(disconnect);
- vm2.invoke(disconnect);
- vm0.invoke(getStopLocatorRunnable());
+ };
+ vm1.invoke(disconnect);
+ vm2.invoke(disconnect);
+ vm0.invoke(getStopLocatorRunnable());
} finally {
vm0.invoke(getStopLocatorRunnable());
}
}
-
- /** set a short locator refresh rate */
+ /**
+ * set a short locator refresh rate
+ */
public static void setShortRefreshWait() {
System.setProperty("p2p.gossipRefreshRate", "2000");
}
-
- /** remove shortened locator refresh rate */
+
+ /**
+ * remove shortened locator refresh rate
+ */
public static void resetRefreshWait() {
System.getProperties().remove("p2p.gossipRefreshRate");
}
-
+
public static boolean isSystemConnected() {
DistributedSystem sys = InternalDistributedSystem.getAnyInstance();
if (sys != null && sys.isConnected()) {
@@ -1315,12 +1299,10 @@ public class LocatorDUnitTest extends DistributedTestCase {
}
return false;
}
-
static boolean beforeFailureNotificationReceived;
static boolean afterFailureNotificationReceived;
-
/**
* Tests starting multiple locators in multiple VMs.
*/
@@ -1338,15 +1320,15 @@ public class LocatorDUnitTest extends DistributedTestCase {
final int port2 = freeTCPPorts[1];
this.port2 = port2;
deleteLocatorStateFile(port1, port2);
- final String host0 = getServerHostName(host);
+ final String host0 = getServerHostName(host);
final String locators = host0 + "[" + port1 + "]," +
- host0 + "[" + port2 + "]";
+ host0 + "[" + port2 + "]";
final Properties dsProps = new Properties();
dsProps.setProperty("locators", locators);
dsProps.setProperty("mcast-port", "0");
dsProps.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-// dsProps.setProperty("log-level", "finest");
+ // dsProps.setProperty("log-level", "finest");
final String uniqueName = getUniqueName();
vm0.invoke(new SerializableRunnable("Start locator on " + port1) {
@@ -1377,14 +1359,14 @@ public class LocatorDUnitTest extends DistributedTestCase {
try {
SerializableRunnable connect =
- new SerializableRunnable("Connect to " + locators) {
- public void run() {
- Properties props = new Properties();
- props.setProperty("mcast-port", "0");
- props.setProperty("locators", locators);
- DistributedSystem.connect(props);
- }
- };
+ new SerializableRunnable("Connect to " + locators) {
+ public void run() {
+ Properties props = new Properties();
+ props.setProperty("mcast-port", "0");
+ props.setProperty("locators", locators);
+ DistributedSystem.connect(props);
+ }
+ };
vm1.invoke(connect);
vm2.invoke(connect);
@@ -1392,19 +1374,19 @@ public class LocatorDUnitTest extends DistributedTestCase {
props.setProperty("mcast-port", "0");
props.setProperty("locators", locators);
- system = (InternalDistributedSystem)DistributedSystem.connect(props);
+ system = (InternalDistributedSystem) DistributedSystem.connect(props);
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
try {
return system.getDM().getViewMembers().size() >= 3;
- }
- catch (Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
fail("unexpected exception");
}
return false; // NOTREACHED
}
+
public String description() {
return null;
}
@@ -1417,14 +1399,14 @@ public class LocatorDUnitTest extends DistributedTestCase {
system.disconnect();
SerializableRunnable disconnect =
- new SerializableRunnable("Disconnect from " + locators) {
- public void run() {
- DistributedSystem sys = InternalDistributedSystem.getAnyInstance();
- if (sys != null && sys.isConnected()) {
- sys.disconnect();
- }
- }
- };
+ new SerializableRunnable("Disconnect from " + locators) {
+ public void run() {
+ DistributedSystem sys = InternalDistributedSystem.getAnyInstance();
+ if (sys != null && sys.isConnected()) {
+ sys.disconnect();
+ }
+ }
+ };
vm1.invoke(disconnect);
vm2.invoke(disconnect);
@@ -1435,7 +1417,159 @@ public class LocatorDUnitTest extends DistributedTestCase {
vm0.invoke(getStopLocatorRunnable());
}
}
-
+
+ /**
+ * Tests starting multiple locators at the same time and ensuring that the locators
+ * end up only have 1 master.
+ * GEODE-870
+ */
+ public void testMultipleLocatorsRestartingAtSameTime() throws Exception {
+ disconnectAllFromDS();
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+ VM vm3 = host.getVM(3);
+ VM vm4 = host.getVM(4);
+
+ int[] freeTCPPorts = AvailablePortHelper.getRandomAvailableTCPPorts(3);
+ this.port1 = freeTCPPorts[0];
+ this.port2 = freeTCPPorts[1];
+ int port3 = freeTCPPorts[2];
+ deleteLocatorStateFile(port1, port2, port3);
+ final String host0 = getServerHostName(host);
+ final String locators = host0 + "[" + port1 + "]," +
+ host0 + "[" + port2 + "]," +
+ host0 + "[" + port3 + "]";
+
+ final Properties dsProps = new Properties();
+ dsProps.setProperty("locators", locators);
+ dsProps.setProperty("mcast-port", "0");
+ dsProps.setProperty("log-level", "FINE");
+ dsProps.setProperty("enable-network-partition-detection", "true");
+ dsProps.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+ final String uniqueName = getUniqueName();
+
+ startLocatorSync(vm0, new Object[] { port1, dsProps });
+ startLocatorSync(vm1, new Object[] { port2, dsProps });
+ startLocatorSync(vm2, new Object[] { port3, dsProps });
+ try {
+ try {
+
+ SerializableRunnable connect =
+ new SerializableRunnable("Connect to " + locators) {
+ public void run() {
+ Properties props = new Properties();
+ props.setProperty("mcast-port", "0");
+ props.setProperty("locators", locators);
+ props.setProperty("log-level", "FINE");
+ props.setProperty("enable-network-partition-detection", "true");
+ DistributedSystem.connect(props);
+ }
+ };
+ vm3.invoke(connect);
+ vm4.invoke(connect);
+
+ Properties props = new Properties();
+ props.setProperty("mcast-port", "0");
+ props.setProperty("locators", locators);
+
+ system = (InternalDistributedSystem) DistributedSystem.connect(props);
+
+ WaitCriterion waitCriterion = new WaitCriterion() {
+ public boolean done() {
+ try {
+ return system.getDM().getViewMembers().size() >= 3;
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("unexpected exception");
+ }
+ return false; // NOTREACHED
+ }
+
+ public String description() {
+ return null;
+ }
+ };
+ DistributedTestCase.waitForCriterion(waitCriterion, 10 * 1000, 200, true);
+
+ // three applications plus
+ assertEquals(6, system.getDM().getViewMembers().size());
+
+ vm0.invoke(getStopLocatorRunnable());
+ vm1.invoke(getStopLocatorRunnable());
+ vm2.invoke(getStopLocatorRunnable());
+
+ final String newLocators = host0 + "[" + port2 + "]," +
+ host0 + "[" + port3 + "]";
+ dsProps.setProperty("locators", newLocators);
+
+ startLocatorAsync(vm1, new Object[] { port2, dsProps });
+ startLocatorAsync(vm2, new Object[] { port3, dsProps });
+
+ waitCriterion = new WaitCriterion() {
+ public boolean done() {
+ try {
+ return system.getDM().getAllHostedLocators().size() == 2;
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("unexpected exception");
+ }
+ return false; // NOTREACHED
+ }
+
+ public String description() {
+ return null;
+ }
+ };
+ DistributedTestCase.waitForCriterion(waitCriterion, 10 * 1000, 200, true);
+
+ } finally {
+ system.disconnect();
+ SerializableRunnable disconnect =
+ new SerializableRunnable("Disconnect from " + locators) {
+ public void run() {
+ DistributedSystem sys = InternalDistributedSystem.getAnyInstance();
+ if (sys != null && sys.isConnected()) {
+ sys.disconnect();
+ }
+ }
+ };
+ vm3.invoke(disconnect);
+ vm4.invoke(disconnect);
+ vm2.invoke(getStopLocatorRunnable());
+ vm1.invoke(getStopLocatorRunnable());
+ }
+ } finally {
+ }
+ }
+
+ private void startLocatorSync(VM vm, Object[] args) {
+ vm.invoke(new SerializableRunnable("Starting process on " + args[0], args) {
+ public void run() {
+ File logFile = new File("");
+ try {
+ Locator.startLocatorAndDS((int) args[0], logFile, (Properties) args[1]);
+ } catch (IOException ex) {
+ fail("While starting process on port " + args[0], ex);
+ }
+ }
+ });
+ }
+
+ private void startLocatorAsync(VM vm, Object[] args) {
+ vm.invokeAsync(new SerializableRunnable("Starting process on " + args[0], args) {
+ public void run() {
+ File logFile = new File("");
+ try {
+ Locator.startLocatorAndDS((int) args[0], logFile, (Properties) args[1]);
+ } catch (IOException ex) {
+ fail("While starting process on port " + args[0], ex);
+ }
+ }
+ });
+ }
+
/**
* Tests starting multiple locators in multiple VMs.
*/
@@ -1455,52 +1589,50 @@ public class LocatorDUnitTest extends DistributedTestCase {
this.port2 = port2;
deleteLocatorStateFile(port1, port2);
final int mcastport = AvailablePort.getRandomAvailablePort(AvailablePort.MULTICAST);
-
- final String host0 = getServerHostName(host);
+
+ final String host0 = getServerHostName(host);
final String locators = host0 + "[" + port1 + "]," +
- host0 + "[" + port2 + "]";
+ host0 + "[" + port2 + "]";
final String uniqueName = getUniqueName();
-
- vm0.invoke(new SerializableRunnable("Start locator on " + port1) {
- public void run() {
- File logFile = new File("");
- try {
- Properties props = new Properties();
- props.setProperty("mcast-port", String.valueOf(mcastport));
- props.setProperty("locators", locators);
- props.setProperty("log-level", getDUnitLogLevel());
- props.setProperty("mcast-ttl", "0");
- props.setProperty("enable-network-partition-detection", "true");
- props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
- Locator.startLocatorAndDS(port1, logFile, null, props);
- }
- catch (IOException ex) {
- fail("While starting locator on port " + port1, ex);
- }
+ vm0.invoke(new SerializableRunnable("Start locator on " + port1) {
+ public void run() {
+ File logFile = new File("");
+ try {
+ Properties props = new Properties();
+ props.setProperty("mcast-port", String.valueOf(mcastport));
+ props.setProperty("locators", locators);
+ props.setProperty("log-level", getDUnitLogLevel());
+ props.setProperty("mcast-ttl", "0");
+ props.setProperty("enable-network-partition-detection", "true");
+ props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+
+ Locator.startLocatorAndDS(port1, logFile, null, props);
+ } catch (IOException ex) {
+ fail("While starting locator on port " + port1, ex);
}
- });
+ }
+ });
vm3.invoke(new SerializableRunnable("Start locator on " + port2) {
- public void run() {
- File logFile = new File("");
- try {
- Properties props = new Properties();
- props.setProperty("mcast-port", String.valueOf(mcastport));
- props.setProperty("locators", locators);
- props.setProperty("log-level", getDUnitLogLevel());
- props.setProperty("mcast-ttl", "0");
- props.setProperty("enable-network-partition-detection", "true");
- props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
- Locator.startLocatorAndDS(port2, logFile, null, props);
- }
- catch (IOException ex) {
- fail("While starting locator on port " + port2, ex);
- }
+ public void run() {
+ File logFile = new File("");
+ try {
+ Properties props = new Properties();
+ props.setProperty("mcast-port", String.valueOf(mcastport));
+ props.setProperty("locators", locators);
+ props.setProperty("log-level", getDUnitLogLevel());
+ props.setProperty("mcast-ttl", "0");
+ props.setProperty("enable-network-partition-detection", "true");
+ props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+ Locator.startLocatorAndDS(port2, logFile, null, props);
+ } catch (IOException ex) {
+ fail("While starting locator on port " + port2, ex);
}
- });
+ }
+ });
SerializableRunnable connect =
- new SerializableRunnable("Connect to " + locators) {
+ new SerializableRunnable("Connect to " + locators) {
public void run() {
Properties props = new Properties();
props.setProperty("mcast-port", String.valueOf(mcastport));
@@ -1522,17 +1654,17 @@ public class LocatorDUnitTest extends DistributedTestCase {
props.setProperty("mcast-ttl", "0");
props.setProperty("enable-network-partition-detection", "true");
- system = (InternalDistributedSystem)DistributedSystem.connect(props);
+ system = (InternalDistributedSystem) DistributedSystem.connect(props);
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
try {
return system.getDM().getViewMembers().size() == 5;
- }
- catch (Exception e) {
+ } catch (Exception e) {
fail("unexpected exception", e);
}
return false; // NOTREACHED
}
+
public String description() {
return "waiting for 5 members - have " + system.getDM().getViewMembers().size();
}
@@ -1541,7 +1673,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
system.disconnect();
SerializableRunnable disconnect =
- new SerializableRunnable("Disconnect from " + locators) {
+ new SerializableRunnable("Disconnect from " + locators) {
public void run() {
DistributedSystem sys = InternalDistributedSystem.getAnyInstance();
if (sys != null && sys.isConnected()) {
@@ -1551,8 +1683,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
};
vm1.invoke(disconnect);
vm2.invoke(disconnect);
- }
- finally {
+ } finally {
SerializableRunnable stop = getStopLocatorRunnable();
vm0.invoke(stop);
vm3.invoke(stop);
@@ -1562,7 +1693,6 @@ public class LocatorDUnitTest extends DistributedTestCase {
}
}
-
/**
* Tests that a VM can connect to a locator that is hosted in its
* own VM.
@@ -1572,22 +1702,22 @@ public class LocatorDUnitTest extends DistributedTestCase {
Host host = Host.getHost(0);
port1 =
- AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+ AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
deleteLocatorStateFile(port1);
File logFile = new File("");
Locator locator = Locator.startLocator(port1, logFile);
try {
- final String locators = getServerHostName(host) + "[" + port1 + "]";
+ final String locators = getServerHostName(host) + "[" + port1 + "]";
- Properties props = new Properties();
- props.setProperty("mcast-port", "0");
- props.setProperty("locators", locators);
- props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
- system = (InternalDistributedSystem)DistributedSystem.connect(props);
- system.disconnect();
+ Properties props = new Properties();
+ props.setProperty("mcast-port", "0");
+ props.setProperty("locators", locators);
+ props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+ system = (InternalDistributedSystem) DistributedSystem.connect(props);
+ system.disconnect();
} finally {
- locator.stop();
+ locator.stop();
}
}
@@ -1600,7 +1730,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
Host host = Host.getHost(0);
VM vm1 = host.getVM(1);
Locator locator = null;
-
+
try {
port1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
deleteLocatorStateFile(port1);
@@ -1613,37 +1743,36 @@ public class LocatorDUnitTest extends DistributedTestCase {
properties.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
File logFile = new File("");
locator = Locator.startLocatorAndDS(port1, logFile, properties);
-
+
final Properties properties2 = new Properties();
properties2.put("mcast-port", "0");
properties2.put("locators", locators);
properties2.put(DistributionConfig.ENABLE_NETWORK_PARTITION_DETECTION_NAME, "false");
properties2.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
properties2.put("disable-auto-reconnect", "true");
-
+
vm1.invoke(new SerializableRunnable("try to connect") {
public void run() {
DistributedSystem s = null;
try {
s = DistributedSystem.connect(properties2);
- boolean enabled = ((InternalDistributedSystem)s).getConfig().getEnableNetworkPartitionDetection();
+ boolean enabled = ((InternalDistributedSystem) s).getConfig().getEnableNetworkPartitionDetection();
s.disconnect();
if (!enabled) {
fail("should not have been able to connect with different enable-network-partition-detection settings");
}
- }
- catch (GemFireConfigException e) {
+ } catch (GemFireConfigException e) {
fail("should have been able to connect and have enable-network-partion-detection enabled");
}
}
});
-
+
locator.stop();
-
+
// now start the locator with enable-network-partition-detection=false
logFile = new File("");
- locator = Locator.startLocatorAndDS(port1, logFile , properties2);
-
+ locator = Locator.startLocatorAndDS(port1, logFile, properties2);
+
vm1.invoke(new SerializableRunnable("try to connect") {
public void run() {
DistributedSystem s = null;
@@ -1651,17 +1780,15 @@ public class LocatorDUnitTest extends DistributedTestCase {
s = DistributedSystem.connect(properties);
s.disconnect();
fail("should not have been able to connect with different enable-network-partition-detection settings");
- }
- catch (GemFireConfigException e) {
+ } catch (GemFireConfigException e) {
// passed
}
}
});
-
+
locator.stop();
locator = null;
- }
- finally {
+ } finally {
if (locator != null) {
locator.stop();
}
@@ -1678,57 +1805,57 @@ public class LocatorDUnitTest extends DistributedTestCase {
//VM vm1 = host.getVM(1);
port1 =
- AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+ AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
File logFile1 = new File("");
deleteLocatorStateFile(port1);
Locator locator1 = Locator.startLocator(port1, logFile1);
-
+
try {
- int port2 =
- AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
- File logFile2 = new File("");
+ int port2 =
+ AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+ File logFile2 = new File("");
- deleteLocatorStateFile(port2);
-
- try {
- Locator locator2 = Locator.startLocator(port2, logFile2);
- fail("expected second locator start to fail.");
- } catch (IllegalStateException expected) {
- }
+ deleteLocatorStateFile(port2);
- final String host0 = getServerHostName(host);
- final String locators = host0 + "[" + port1 + "]," +
- host0 + "[" + port2 + "]";
+ try {
+ Locator locator2 = Locator.startLocator(port2, logFile2);
+ fail("expected second locator start to fail.");
+ } catch (IllegalStateException expected) {
+ }
- SerializableRunnable connect =
- new SerializableRunnable("Connect to " + locators) {
- public void run() {
- Properties props = new Properties();
- props.setProperty("mcast-port", "0");
- props.setProperty("locators", locators);
- props.setProperty("log-level", getDUnitLogLevel());
- DistributedSystem.connect(props);
- }
- };
- connect.run();
- //vm1.invoke(connect);
+ final String host0 = getServerHostName(host);
+ final String locators = host0 + "[" + port1 + "]," +
+ host0 + "[" + port2 + "]";
- SerializableRunnable disconnect =
- new SerializableRunnable("Disconnect from " + locators) {
- public void run() {
- DistributedSystem sys = InternalDistributedSystem.getAnyInstance();
- if (sys != null && sys.isConnected()) {
- sys.disconnect();
+ SerializableRunnable connect =
+ new SerializableRunnable("Connect to " + locators) {
+ public void run() {
+ Properties props = new Properties();
+ props.setProperty("mcast-port", "0");
+ props.setProperty("locators", locators);
+ props.setProperty("log-level", getDUnitLogLevel());
+ DistributedSystem.connect(props);
}
- }
- };
+ };
+ connect.run();
+ //vm1.invoke(connect);
+
+ SerializableRunnable disconnect =
+ new SerializableRunnable("Disconnect from " + locators) {
+ public void run() {
+ DistributedSystem sys = InternalDistributedSystem.getAnyInstance();
+ if (sys != null && sys.isConnected()) {
+ sys.disconnect();
+ }
+ }
+ };
- disconnect.run();
- //vm1.invoke(disconnect);
+ disconnect.run();
+ //vm1.invoke(disconnect);
} finally {
- locator1.stop();
+ locator1.stop();
}
}
@@ -1741,13 +1868,13 @@ public class LocatorDUnitTest extends DistributedTestCase {
public void testRestartLocator() throws Exception {
disconnectAllFromDS();
port1 =
- AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+ AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
deleteLocatorStateFile(port1);
File logFile = new File("");
- File stateFile = new File("locator"+port1+"state.dat");
+ File stateFile = new File("locator" + port1 + "state.dat");
VM vm0 = Host.getHost(0).getVM(0);
final Properties p = new Properties();
- p.setProperty(DistributionConfig.LOCATORS_NAME, Host.getHost(0).getHostName() + "["+port1+"]");
+ p.setProperty(DistributionConfig.LOCATORS_NAME, Host.getHost(0).getHostName() + "[" + port1 + "]");
p.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
p.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
if (stateFile.exists()) {
@@ -1757,50 +1884,56 @@ public class LocatorDUnitTest extends DistributedTestCase {
getLogWriter().info("Starting locator");
Locator locator = Locator.startLocatorAndDS(port1, logFile, p);
try {
-
- SerializableRunnable connect =
- new SerializableRunnable("Connect to locator on port " + port1) {
- public void run() {
- DistributedSystem.connect(p);
- }
- };
- vm0.invoke(connect);
-
- getLogWriter().info("Stopping locator");
- locator.stop();
-
- getLogWriter().info("Starting locator");
- locator = Locator.startLocatorAndDS(port1, logFile, p);
-
- vm0.invoke(new SerializableRunnable("disconnect") {
- public void run() {
- DistributedSystem.connect(p).disconnect();
- }
- });
-
+
+ SerializableRunnable connect =
+ new SerializableRunnable("Connect to locator on port " + port1) {
+ public void run() {
+ DistributedSystem.connect(p);
+ }
+ };
+ vm0.invoke(connect);
+
+ getLogWriter().info("Stopping locator");
+ locator.stop();
+
+ getLogWriter().info("Starting locator");
+ locator = Locator.startLocatorAndDS(port1, logFile, p);
+
+ vm0.invoke(new SerializableRunnable("disconnect") {
+ public void run() {
+ DistributedSystem.connect(p).disconnect();
+ }
+ });
+
} finally {
- locator.stop();
+ locator.stop();
}
}
-
- /** return the distributed member id for the ds on this vm */
+
+ /**
+ * return the distributed member id for the ds on this vm
+ */
public static DistributedMember getDistributedMember(Properties props) {
- props.put("name", "vm_"+VM.getCurrentVMNum());
+ props.put("name", "vm_" + VM.getCurrentVMNum());
DistributedSystem sys = DistributedSystem.connect(props);
sys.getLogWriter().info("<ExpectedException action=add>service failure</ExpectedException>");
sys.getLogWriter().info("<ExpectedException action=add>com.gemstone.gemfire.ConnectException</ExpectedException>");
sys.getLogWriter().info("<ExpectedException action=add>com.gemstone.gemfire.ForcedDisconnectException</ExpectedException>");
return DistributedSystem.connect(props).getDistributedMember();
}
-
- /** find a running locator and return its distributed member id */
+
+ /**
+ * find a running locator and return its distributed member id
+ */
public static DistributedMember getLocatorDistributedMember() {
return (Locator.getLocators().iterator().next())
- .getDistributedSystem().getDistributedMember();
+ .getDistributedSystem().getDistributedMember();
}
-
- /** find the lead member and return its id */
+
+ /**
+ * find the lead member and return its id
+ */
public static DistributedMember getLeadMember() {
DistributedSystem sys = InternalDistributedSystem.getAnyInstance();
return MembershipManagerHelper.getLeadMember(sys);
@@ -1818,7 +1951,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
}
};
}
-
+
private SerializableRunnable getStartSBLocatorRunnable(final int port, final String name) {
return new SerializableRunnable("Start locator on port " + port) {
public void run() {
@@ -1832,34 +1965,32 @@ public class LocatorDUnitTest extends DistributedTestCase {
Locator.startLocatorAndDS(port, logFile, locProps);
} catch (IOException ex) {
fail("While starting locator on port " + port, ex);
- }
- finally {
+ } finally {
System.getProperties().remove(InternalLocator.LOCATORS_PREFERRED_AS_COORDINATORS);
System.getProperties().remove("p2p.joinTimeout");
}
}
};
}
-
+
protected void nukeJChannel(DistributedSystem sys) {
sys.getLogWriter().info("<ExpectedException action=add>service failure</ExpectedException>");
sys.getLogWriter().info("<ExpectedException action=add>com.gemstone.gemfire.ConnectException</ExpectedException>");
sys.getLogWriter().info("<ExpectedException action=add>com.gemstone.gemfire.ForcedDisconnectException</ExpectedException>");
try {
MembershipManagerHelper.crashDistributedSystem(sys);
- }
- catch (DistributedSystemDisconnectedException se) {
+ } catch (DistributedSystemDisconnectedException se) {
// it's okay for the system to already be shut down
}
sys.getLogWriter().info("<ExpectedException action=remove>service failure</ExpectedException>");
sys.getLogWriter().info("<ExpectedException action=remove>com.gemstone.gemfire.ForcedDisconnectException</ExpectedException>");
}
-
//New test hook which blocks before closing channel.
class TestHook implements MembershipTestHook {
volatile boolean unboundedWait = true;
+
@Override
public void beforeMembershipFailure(String reason, Throwable cause) {
System.out.println("Inside TestHook.beforeMembershipFailure with " + cause);
@@ -1882,22 +2013,27 @@ public class LocatorDUnitTest extends DistributedTestCase {
}
}
+
class MyMembershipListener implements MembershipListener {
boolean quorumLostInvoked;
List<String> suspectReasons = new ArrayList<>(50);
-
- public void memberJoined(InternalDistributedMember id) { }
- public void memberDeparted(InternalDistributedMember id, boolean crashed) { }
+
+ public void memberJoined(InternalDistributedMember id) {
+ }
+
+ public void memberDeparted(InternalDistributedMember id, boolean crashed) {
+ }
+
public void memberSuspect(InternalDistributedMember id,
InternalDistributedMember whoSuspected, String reason) {
suspectReasons.add(reason);
}
+
public void quorumLost(Set<InternalDistributedMember> failures,
List<InternalDistributedMember> remaining) {
quorumLostInvoked = true;
getLogWriter().info("quorumLost invoked in test code");
}
}
-
-
+
}
[3/4] incubator-geode git commit: GEODE-870: Rejecting of old view
Posted by ud...@apache.org.
GEODE-870: Rejecting of old view
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/a0759a87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/a0759a87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/a0759a87
Branch: refs/heads/feature/GEODE-870
Commit: a0759a871cbc4e40e3e1d4ff2a54415ba40865de
Parents: 3bd930f
Author: Udo Kohlmeyer <uk...@pivotal.io>
Authored: Mon Feb 1 14:08:32 2016 +1100
Committer: Udo Kohlmeyer <uk...@pivotal.io>
Committed: Mon Feb 1 14:08:32 2016 +1100
----------------------------------------------------------------------
.../membership/gms/membership/GMSJoinLeave.java | 432 +++---
.../gms/messages/ViewRejectMessage.java | 96 ++
.../internal/DataSerializableFixedID.java | 3 +-
.../gemfire/distributed/LocatorDUnitTest.java | 1360 ++++++++++--------
.../test/dunit/SerializableRunnable.java | 64 +-
5 files changed, 1117 insertions(+), 838 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a0759a87/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index 6b09214..5898b23 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -16,41 +16,6 @@
*/
package com.gemstone.gemfire.distributed.internal.membership.gms.membership;
-import static com.gemstone.gemfire.internal.DataSerializableFixedID.FIND_COORDINATOR_REQ;
-import static com.gemstone.gemfire.internal.DataSerializableFixedID.FIND_COORDINATOR_RESP;
-import static com.gemstone.gemfire.internal.DataSerializableFixedID.INSTALL_VIEW_MESSAGE;
-import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_REQUEST;
-import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_RESPONSE;
-import static com.gemstone.gemfire.internal.DataSerializableFixedID.LEAVE_REQUEST_MESSAGE;
-import static com.gemstone.gemfire.internal.DataSerializableFixedID.NETWORK_PARTITION_MESSAGE;
-import static com.gemstone.gemfire.internal.DataSerializableFixedID.REMOVE_MEMBER_REQUEST;
-import static com.gemstone.gemfire.internal.DataSerializableFixedID.VIEW_ACK_MESSAGE;
-import static com.gemstone.gemfire.distributed.internal.membership.gms.ServiceConfig.MEMBER_REQUEST_COLLECTION_INTERVAL;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimerTask;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.logging.log4j.Logger;
-
import com.gemstone.gemfire.GemFireConfigException;
import com.gemstone.gemfire.SystemConnectException;
import com.gemstone.gemfire.distributed.DistributedMember;
@@ -68,120 +33,180 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.JoinL
import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.MessageHandler;
import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorRequest;
import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorResponse;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.HasMemberID;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.InstallViewMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinRequestMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinResponseMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.LeaveRequestMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.NetworkPartitionMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.RemoveMemberMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.ViewAckMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.*;
import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient;
import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.security.AuthenticationFailedException;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.gemstone.gemfire.distributed.internal.membership.gms.ServiceConfig.MEMBER_REQUEST_COLLECTION_INTERVAL;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.*;
/**
* GMSJoinLeave handles membership communication with other processes in the
* distributed system. It replaces the JGroups channel membership services
* that Geode formerly used for this purpose.
- *
*/
public class GMSJoinLeave implements JoinLeave, MessageHandler {
-
+
public static final String BYPASS_DISCOVERY_PROPERTY = "gemfire.bypass-discovery";
- /** amount of time to wait for responses to FindCoordinatorRequests */
+ /**
+ * amount of time to wait for responses to FindCoordinatorRequests
+ */
private static final int DISCOVERY_TIMEOUT = Integer.getInteger("gemfire.discovery-timeout", 3000);
- /** amount of time to sleep before trying to join after a failed attempt */
+ /**
+ * amount of time to sleep before trying to join after a failed attempt
+ */
private static final int JOIN_RETRY_SLEEP = Integer.getInteger("gemfire.join-retry-sleep", 1000);
- /** time to wait for a broadcast message to be transmitted by jgroups */
+ /**
+ * time to wait for a broadcast message to be transmitted by jgroups
+ */
private static final long BROADCAST_MESSAGE_SLEEP_TIME = Long.getLong("gemfire.broadcast-message-sleep-time", 1000);
- /** if the locators don't know who the coordinator is we send find-coord requests to this many nodes */
+ /**
+ * if the locators don't know who the coordinator is we send find-coord requests to this many nodes
+ */
private static final int MAX_DISCOVERY_NODES = Integer.getInteger("gemfire.max-discovery-nodes", 30);
- /** interval for broadcasting the current view to members in case they didn't get it the first time */
+ /**
+ * interval for broadcasting the current view to members in case they didn't get it the first time
+ */
private static final long VIEW_BROADCAST_INTERVAL = Long.getLong("gemfire.view-broadcast-interval", 60000);
- /** membership logger */
+ /**
+ * membership logger
+ */
private static final Logger logger = Services.getLogger();
- /** the view ID where I entered into membership */
+ /**
+ * the view ID where I entered into membership
+ */
private int birthViewId;
- /** my address */
+ /**
+ * my address
+ */
private InternalDistributedMember localAddress;
private Services services;
- /** have I connected to the distributed system? */
+ /**
+ * have I connected to the distributed system?
+ */
private volatile boolean isJoined;
- /** guarded by viewInstallationLock */
+ /**
+ * guarded by viewInstallationLock
+ */
private boolean isCoordinator;
- /** a synch object that guards view installation */
+ /**
+ * a synch object that guards view installation
+ */
private final Object viewInstallationLock = new Object();
-
- /** the currently installed view. Guarded by viewInstallationLock */
+
+ /**
+ * the currently installed view. Guarded by viewInstallationLock
+ */
private volatile NetView currentView;
- /** the previous view **/
+ /**
+ * the previous view
+ **/
private volatile NetView previousView;
private final Set<InternalDistributedMember> removedMembers = new HashSet<>();
- /** members who we've received a leave message from **/
+ /**
+ * members who we've received a leave message from
+ **/
private final Set<InternalDistributedMember> leftMembers = new HashSet<>();
- /** a new view being installed */
+ /**
+ * a new view being installed
+ */
private NetView preparedView;
- /** the last view that conflicted with view preparation */
+ /**
+ * the last view that conflicted with view preparation
+ */
private NetView lastConflictingView;
private List<InetSocketAddress> locators;
- /** a list of join/leave/crashes */
+ /**
+ * a list of join/leave/crashes
+ */
private final List<DistributionMessage> viewRequests = new LinkedList<DistributionMessage>();
- /** the established request collection jitter. This can be overridden for testing with delayViewCreationForTest */
+ /**
+ * the established request collection jitter. This can be overridden for testing with delayViewCreationForTest
+ */
long requestCollectionInterval = MEMBER_REQUEST_COLLECTION_INTERVAL;
- /** collects the response to a join request */
+ /**
+ * collects the response to a join request
+ */
private JoinResponseMessage[] joinResponse = new JoinResponseMessage[1];
- /** collects responses to new views */
+ /**
+ * collects responses to new views
+ */
private ViewReplyProcessor viewProcessor = new ViewReplyProcessor(false);
- /** collects responses to view preparation messages */
+ /**
+ * collects responses to view preparation messages
+ */
private ViewReplyProcessor prepareProcessor = new ViewReplyProcessor(true);
- /** whether quorum checks can cause a forced-disconnect */
+ /**
+ * whether quorum checks can cause a forced-disconnect
+ */
private boolean quorumRequired = false;
- /** timeout in receiving view acknowledgement */
+ /**
+ * timeout in receiving view acknowledgement
+ */
private int viewAckTimeout;
- /** background thread that creates new membership views */
+ /**
+ * background thread that creates new membership views
+ */
private ViewCreator viewCreator;
- /** am I shutting down? */
+ /**
+ * am I shutting down?
+ */
private volatile boolean isStopping;
- /** state of collected artifacts during discovery */
+ /**
+ * state of collected artifacts during discovery
+ */
final SearchState searchState = new SearchState();
- /** a collection used to detect unit testing */
+ /**
+ * a collection used to detect unit testing
+ */
Set<String> unitTesting = new HashSet<>();
-
- /** a test hook to make this member unresponsive */
+
+ /**
+ * a test hook to make this member unresponsive
+ */
private volatile boolean playingDead;
-
- /** the view where quorum was most recently lost */
+
+ /**
+ * the view where quorum was most recently lost
+ */
NetView quorumLostView;
static class SearchState {
@@ -207,44 +232,44 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
/**
* attempt to join the distributed system
* loop
- * send a join request to a locator & get a response
- *
+ * send a join request to a locator & get a response
+ * <p>
* If the response indicates there's no coordinator it
* will contain a set of members that have recently contacted
* it. The "oldest" member is selected as the coordinator
* based on ID sort order.
- *
+ *
* @return true if successful, false if not
*/
public boolean join() {
try {
if (Boolean.getBoolean(BYPASS_DISCOVERY_PROPERTY)) {
- synchronized(viewInstallationLock) {
+ synchronized (viewInstallationLock) {
becomeCoordinator();
}
return true;
}
SearchState state = searchState;
-
- long locatorWaitTime = ((long)services.getConfig().getLocatorWaitTime()) * 1000L;
+
+ long locatorWaitTime = ((long) services.getConfig().getLocatorWaitTime()) * 1000L;
long timeout = services.getConfig().getJoinTimeout();
logger.debug("join timeout is set to {}", timeout);
- long retrySleep = JOIN_RETRY_SLEEP;
+ long retrySleep = JOIN_RETRY_SLEEP;
long startTime = System.currentTimeMillis();
long locatorGiveUpTime = startTime + locatorWaitTime;
long giveupTime = startTime + timeout;
- for (int tries=0; !this.isJoined && !this.isStopping; tries++) {
+ for (int tries = 0; !this.isJoined && !this.isStopping; tries++) {
logger.debug("searching for the membership coordinator");
boolean found = findCoordinator();
if (found) {
logger.debug("found possible coordinator {}", state.possibleCoordinator);
if (localAddress.getNetMember().preferredForCoordinator()
&& state.possibleCoordinator.equals(this.localAddress)) {
- if (tries > 2 || System.currentTimeMillis() < giveupTime ) {
- synchronized(viewInstallationLock) {
+ if (tries > 2 || System.currentTimeMillis() < giveupTime) {
+ synchronized (viewInstallationLock) {
becomeCoordinator();
}
return true;
@@ -291,14 +316,14 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
// unable to contact any of the locators
if (!this.isJoined && state.hasContactedAJoinedLocator) {
throw new SystemConnectException("Unable to join the distributed system in "
- + (System.currentTimeMillis()-startTime) + "ms");
+ + (System.currentTimeMillis() - startTime) + "ms");
}
return this.isJoined;
} finally {
// notify anyone waiting on the address to be completed
if (this.isJoined) {
- synchronized(this.localAddress) {
+ synchronized (this.localAddress) {
this.localAddress.notifyAll();
}
}
@@ -309,11 +334,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
/**
* send a join request and wait for a reply. Process the reply.
* This may throw a SystemConnectException or an AuthenticationFailedException
- *
+ *
* @param coord
* @return true if the attempt succeeded, false if it timed out
*/
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="WA_NOT_IN_LOOP")
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "WA_NOT_IN_LOOP")
boolean attemptToJoin() {
SearchState state = searchState;
@@ -335,7 +360,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
Thread.currentThread().interrupt();
return false;
}
-
+
if (response == null) {
if (!isJoined) {
logger.debug("received no join response");
@@ -347,13 +372,13 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
joinResponse[0] = null;
String failReason = response.getRejectionMessage();
if (failReason != null) {
- if (failReason.contains("Rejecting the attempt of a member using an older version")
+ if (failReason.contains("Rejecting the attempt of a member using an older version")
|| failReason.contains("15806")) {
throw new SystemConnectException(failReason);
}
throw new AuthenticationFailedException(failReason);
}
-
+
if (response.getCurrentView() == null) {
logger.info("received join response with no membership view: {}", response);
return isJoined;
@@ -361,13 +386,13 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (response.getBecomeCoordinator()) {
logger.info("I am being told to become the membership coordinator by {}", coord);
- synchronized(viewInstallationLock) {
+ synchronized (viewInstallationLock) {
this.currentView = response.getCurrentView();
becomeCoordinator(null);
}
return true;
}
-
+
this.birthViewId = response.getMemberID().getVmViewId();
this.localAddress.setVmViewId(this.birthViewId);
GMSMember me = (GMSMember) this.localAddress.getNetMember();
@@ -397,7 +422,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
* this method will enqueue the request for processing in another thread.
* If this is not the coordinator but the coordinator is known, the message
* is forwarded to the coordinator.
- *
+ *
* @param incomingRequest
*/
private void processJoinRequest(JoinRequestMessage incomingRequest) {
@@ -426,7 +451,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
return;
}
- if (!this.localAddress.getNetMember().preferredForCoordinator() &&
+ if (!this.localAddress.getNetMember().preferredForCoordinator() &&
incomingRequest.getMemberID().getNetMember().preferredForCoordinator()) {
JoinResponseMessage m = new JoinResponseMessage(incomingRequest.getMemberID(), currentView, true);
services.getMessenger().send(m);
@@ -439,7 +464,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
* Process a Leave request from another member. This may cause this member
* to become the new membership coordinator. If this is the coordinator
* a new view will be triggered.
- *
+ *
* @param incomingRequest
*/
private void processLeaveRequest(LeaveRequestMessage incomingRequest) {
@@ -451,13 +476,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
recordViewRequest(incomingRequest);
return;
}
-
-
+
InternalDistributedMember mbr = incomingRequest.getMemberID();
if (logger.isDebugEnabled()) {
- logger.debug("JoinLeave.processLeaveRequest invoked. isCoordinator="+isCoordinator+ "; isStopping="+isStopping
- +"; cancelInProgress="+ services.getCancelCriterion().isCancelInProgress());
+ logger.debug("JoinLeave.processLeaveRequest invoked. isCoordinator=" + isCoordinator + "; isStopping=" + isStopping
+ + "; cancelInProgress=" + services.getCancelCriterion().isCancelInProgress());
}
if (!v.contains(mbr) && mbr.getVmViewId() < v.getViewId()) {
@@ -478,12 +502,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
check.removeAll(removedMembers);
check.addCrashedMembers(removedMembers);
}
- synchronized(leftMembers) {
+ synchronized (leftMembers) {
leftMembers.add(mbr);
check.removeAll(leftMembers);
}
if (check.getCoordinator().equals(localAddress)) {
- synchronized(viewInstallationLock) {
+ synchronized (viewInstallationLock) {
becomeCoordinator(incomingRequest.getMemberID());
}
}
@@ -500,7 +524,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
* Process a Remove request from another member. This may cause this member
* to become the new membership coordinator. If this is the coordinator
* a new view will be triggered.
- *
+ *
* @param incomingRequest
*/
private void processRemoveRequest(RemoveMemberMessage incomingRequest) {
@@ -512,15 +536,15 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
logger.info("Membership ignoring removal request for " + mbr + " from non-member " + incomingRequest.getSender());
return;
}
-
+
if (v == null) {
// not yet a member
return;
}
logger.info("Membership received a request to remove " + mbr
- + " from " + incomingRequest.getSender()
- + " reason="+incomingRequest.getReason());
+ + " from " + incomingRequest.getSender()
+ + " reason=" + incomingRequest.getReason());
if (mbr.equals(this.localAddress)) {
// oops - I've been kicked out
@@ -541,11 +565,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
check.addCrashedMembers(removedMembers);
check.removeAll(removedMembers);
}
- synchronized(leftMembers) {
+ synchronized (leftMembers) {
check.removeAll(leftMembers);
}
if (check.getCoordinator().equals(localAddress)) {
- synchronized(viewInstallationLock) {
+ synchronized (viewInstallationLock) {
becomeCoordinator(mbr);
}
}
@@ -589,40 +613,41 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
void becomeCoordinator() { // package access for unit testing
becomeCoordinator(null);
}
-
+
public void becomeCoordinatorForTest() {
- synchronized(viewInstallationLock) {
+ synchronized (viewInstallationLock) {
becomeCoordinator();
}
}
-
+
/**
* Test hook for delaying the creation of new views.
* This should be invoked before this member becomes coordinator
* and creates its ViewCreator thread.
+ *
* @param millis
*/
public void delayViewCreationForTest(int millis) {
requestCollectionInterval = millis;
}
-
/**
* Transitions this member into the coordinator role. This must
* be invoked under a synch on viewInstallationLock that was held
* at the time the decision was made to become coordinator so that
* the decision is atomic with actually becoming coordinator.
+ *
* @param oldCoordinator may be null
*/
private void becomeCoordinator(InternalDistributedMember oldCoordinator) {
boolean testing = unitTesting.contains("noRandomViewChange");
assert Thread.holdsLock(viewInstallationLock);
-
+
if (isCoordinator) {
return;
}
-
+
logger.info("This member is becoming the membership coordinator with address {}", localAddress);
isCoordinator = true;
if (currentView == null) {
@@ -643,8 +668,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
NetView newView;
Set<InternalDistributedMember> leaving = new HashSet<>();
Set<InternalDistributedMember> removals;
- synchronized(viewInstallationLock) {
- int rand = testing? 0 : NetView.RANDOM.nextInt(10);
+ synchronized (viewInstallationLock) {
+ int rand = testing ? 0 : NetView.RANDOM.nextInt(10);
int viewNumber = currentView.getViewId() + 5 + rand;
if (this.localAddress.getVmViewId() < 0) {
this.localAddress.setVmViewId(viewNumber);
@@ -653,10 +678,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (!mbrs.contains(localAddress)) {
mbrs.add(localAddress);
}
- synchronized(this.removedMembers) {
+ synchronized (this.removedMembers) {
removals = new HashSet<>(this.removedMembers);
}
- synchronized(this.leftMembers) {
+ synchronized (this.leftMembers) {
leaving.addAll(leftMembers);
}
if (oldCoordinator != null && !removals.contains(oldCoordinator)) {
@@ -725,7 +750,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
StringBuilder s = new StringBuilder();
int[] ports = view.getFailureDetectionPorts();
int numMembers = view.size();
- for (int i=0; i<numMembers; i++) {
+ for (int i = 0; i < numMembers; i++) {
if (i > 0) {
s.append(' ');
}
@@ -754,7 +779,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
InternalDistributedMember conflictingViewSender = rp.getConflictingViewSender();
NetView conflictingView = rp.getConflictingView();
if (conflictingView != null) {
- logger.warn("received a conflicting membership view from " + conflictingViewSender
+ logger.warn("received a conflicting membership view from " + conflictingViewSender
+ " during preparation: " + conflictingView);
return false;
}
@@ -774,7 +799,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (currentView != null && view.getViewId() < currentView.getViewId()) {
// ignore old views
- ackView(m);
+ rejectView(m,"The rejected view is old");
return;
}
@@ -782,7 +807,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (!this.isJoined) {
// if we're still waiting for a join response and we're in this view we
// should install the view so join() can finish its work
- for (InternalDistributedMember mbr: view.getMembers()) {
+ for (InternalDistributedMember mbr : view.getMembers()) {
if (localAddress.compareTo(mbr) == 0) {
viewContainsMyUnjoinedAddress = true;
break;
@@ -825,6 +850,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
+ private void rejectView(InstallViewMessage m, String rejectionReason) {
+ if (!playingDead && m.getView().contains(m.getView().getCreator())) {
+ services.getMessenger().send(new ViewRejectMessage(m.getSender(), m.getView().getViewId(), m.getView(), rejectionReason));
+ }
+ }
+
private void processViewAckMessage(ViewAckMessage m) {
if (m.isPrepareAck()) {
this.prepareProcessor.processViewResponse(m.getViewId(), m.getSender(), m.getAlternateView());
@@ -834,14 +865,16 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
private TcpClientWrapper tcpClientWrapper = new TcpClientWrapper();
-
+
/***
* testing purpose
+ *
* @param tcpClientWrapper
*/
void setTcpClientWrapper(TcpClientWrapper tcpClientWrapper) {
this.tcpClientWrapper = tcpClientWrapper;
}
+
/**
* This contacts the locators to find out who the current coordinator is.
* All locators are contacted. If they don't agree then we choose the oldest
@@ -851,21 +884,21 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
SearchState state = searchState;
assert this.localAddress != null;
-
+
// If we've already tried to bootstrap from locators that
// haven't joined the system (e.g., a collocated locator)
// then jump to using the membership view to try to find
// the coordinator
- if ( !state.hasContactedAJoinedLocator && state.view != null) {
+ if (!state.hasContactedAJoinedLocator && state.view != null) {
return findCoordinatorFromView();
}
FindCoordinatorRequest request = new FindCoordinatorRequest(this.localAddress, state.alreadyTried, state.viewId);
Set<InternalDistributedMember> coordinators = new HashSet<InternalDistributedMember>();
-
- long giveUpTime = System.currentTimeMillis() + ((long)services.getConfig().getLocatorWaitTime() * 1000L);
-
- int connectTimeout = (int)services.getConfig().getMemberTimeout() * 2;
+
+ long giveUpTime = System.currentTimeMillis() + ((long) services.getConfig().getLocatorWaitTime() * 1000L);
+
+ int connectTimeout = (int) services.getConfig().getMemberTimeout() * 2;
boolean anyResponses = false;
boolean flagsSet = false;
@@ -873,12 +906,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
state.hasContactedAJoinedLocator = false;
state.locatorsContacted = 0;
-
+
do {
for (InetSocketAddress addr : locators) {
try {
Object o = tcpClientWrapper.sendCoordinatorFindRequest(addr, request, connectTimeout);
- FindCoordinatorResponse response = (o instanceof FindCoordinatorResponse) ? (FindCoordinatorResponse)o : null;
+ FindCoordinatorResponse response = (o instanceof FindCoordinatorResponse) ? (FindCoordinatorResponse) o : null;
if (response != null) {
state.locatorsContacted++;
if (!state.hasContactedAJoinedLocator &&
@@ -890,7 +923,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (response.getCoordinator() != null) {
anyResponses = true;
NetView v = response.getView();
- int viewId = v == null? -1 : v.getViewId();
+ int viewId = v == null ? -1 : v.getViewId();
if (viewId > state.viewId) {
state.viewId = viewId;
state.view = v;
@@ -910,8 +943,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
} while (!anyResponses && System.currentTimeMillis() < giveUpTime);
-
-
+
if (coordinators.isEmpty()) {
return false;
}
@@ -931,7 +963,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
InternalDistributedMember coord = null;
boolean coordIsNoob = true;
- for (; it.hasNext();) {
+ for (; it.hasNext(); ) {
InternalDistributedMember mbr = it.next();
if (!state.alreadyTried.contains(mbr)) {
boolean mbrIsNoob = (mbr.getVmViewId() < 0);
@@ -951,17 +983,17 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
return true;
}
-
+
protected class TcpClientWrapper {
- protected Object sendCoordinatorFindRequest(InetSocketAddress addr, FindCoordinatorRequest request, int connectTimeout)
- throws ClassNotFoundException, IOException{
+ protected Object sendCoordinatorFindRequest(InetSocketAddress addr, FindCoordinatorRequest request, int connectTimeout)
+ throws ClassNotFoundException, IOException {
return TcpClient.requestToServer(
- addr.getAddress(), addr.getPort(), request, connectTimeout,
+ addr.getAddress(), addr.getPort(), request, connectTimeout,
true);
}
- }
+ }
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="WA_NOT_IN_LOOP")
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "WA_NOT_IN_LOOP")
boolean findCoordinatorFromView() {
ArrayList<FindCoordinatorResponse> result;
SearchState state = searchState;
@@ -1033,8 +1065,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
private void inheritSettingsFromLocator(InetSocketAddress addr, FindCoordinatorResponse response) {
boolean enabled = response.isNetworkPartitionDetectionEnabled();
if (!enabled && services.getConfig().isNetworkPartitionDetectionEnabled()) {
- throw new GemFireConfigException("locator at "+addr
- +" does not have network-partition-detection enabled but my configuration has it enabled");
+ throw new GemFireConfigException("locator at " + addr
+ + " does not have network-partition-detection enabled but my configuration has it enabled");
}
GMSMember mbr = (GMSMember) this.localAddress.getNetMember();
@@ -1045,8 +1077,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (response.isUsePreferredCoordinators()) {
this.quorumRequired = true;
logger.debug("The locator indicates that all locators should be preferred as coordinators");
- if (services.getLocator() != null
- || Locator.hasLocator()
+ if (services.getLocator() != null
+ || Locator.hasLocator()
|| !services.getConfig().getDistributionConfig().getStartLocator().isEmpty()
|| localAddress.getVmKind() == DistributionManager.LOCATOR_DM_TYPE) {
((GMSMember) localAddress.getNetMember()).setPreferredForCoordinator(true);
@@ -1058,7 +1090,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
/**
* receives a JoinResponse holding a membership view or rejection message
- *
+ *
* @param rsp
*/
private void processJoinResponse(JoinResponseMessage rsp) {
@@ -1067,16 +1099,19 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
joinResponse.notifyAll();
}
}
-
+
/**
* for testing, do not use in any other case as it is not thread safe
+ *
* @param req
*/
JoinResponseMessage[] getJoinResponseMessage() {
return joinResponse;
}
+
/***
* for testing purpose
+ *
* @param jrm
*/
void setJoinResponseMessage(JoinResponseMessage jrm) {
@@ -1148,7 +1183,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
return;
}
}
-
+
if (isJoined && isNetworkPartition(newView, true)) {
if (quorumRequired) {
Set<InternalDistributedMember> crashes = newView.getActualCrashedMembers(currentView);
@@ -1156,7 +1191,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
return;
}
}
-
+
previousView = currentView;
currentView = newView;
preparedView = null;
@@ -1166,7 +1201,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (!isJoined) {
logger.debug("notifying join thread");
isJoined = true;
- synchronized(joinResponse) {
+ synchronized (joinResponse) {
joinResponse.notifyAll();
}
}
@@ -1185,7 +1220,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
// newer than the view just processed - the senders will have to
// resend these
synchronized (viewRequests) {
- for (Iterator<DistributionMessage> it = viewRequests.iterator(); it.hasNext();) {
+ for (Iterator<DistributionMessage> it = viewRequests.iterator(); it.hasNext(); ) {
DistributionMessage m = it.next();
if (m instanceof JoinRequestMessage) {
it.remove();
@@ -1205,7 +1240,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
synchronized (removedMembers) {
removeMembersFromCollectionIfNotInView(removedMembers, currentView);
}
- synchronized(leftMembers) {
+ synchronized (leftMembers) {
removeMembersFromCollectionIfNotInView(leftMembers, currentView);
}
}
@@ -1214,7 +1249,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
Iterator<InternalDistributedMember> iterator = members.iterator();
while (iterator.hasNext()) {
if (!currentView.contains(iterator.next())) {
- iterator.remove();
+ iterator.remove();
}
}
}
@@ -1222,7 +1257,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
/**
* Sends a message declaring a network partition to the
* members of the given view via Messenger
- *
+ *
* @param view
*/
void sendNetworkPartitionMessage(NetView view) {
@@ -1288,7 +1323,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
viewCreator.shutdown();
}
}
-
+
private void startViewBroadcaster() {
services.getTimer().schedule(new ViewBroadcaster(), VIEW_BROADCAST_INTERVAL, VIEW_BROADCAST_INTERVAL);
}
@@ -1348,7 +1383,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
stopCoordinatorServices();
if (view != null) {
if (view.size() > 1) {
- List<InternalDistributedMember> coords = view.getPreferredCoordinators(Collections.<InternalDistributedMember> emptySet(), localAddress, 5);
+ List<InternalDistributedMember> coords = view.getPreferredCoordinators(Collections.<InternalDistributedMember>emptySet(), localAddress, 5);
logger.debug("JoinLeave sending a leave request to {}", coords);
LeaveRequestMessage m = new LeaveRequestMessage(coords, this.localAddress, "this member is shutting down");
services.getMessenger().send(m);
@@ -1371,7 +1406,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
processRemoveRequest(msg);
if (!this.isCoordinator) {
msg.resetRecipients();
- msg.setRecipients(v.getPreferredCoordinators(Collections.<InternalDistributedMember> emptySet(), localAddress, 10));
+ msg.setRecipients(v.getPreferredCoordinators(Collections.<InternalDistributedMember>emptySet(), localAddress, 10));
services.getMessenger().send(msg);
}
}
@@ -1379,8 +1414,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
@Override
public void memberShutdown(DistributedMember mbr, String reason) {
- LeaveRequestMessage msg = new LeaveRequestMessage(Collections.singleton(this.localAddress), (InternalDistributedMember)mbr, reason);
- msg.setSender((InternalDistributedMember)mbr);
+ LeaveRequestMessage msg = new LeaveRequestMessage(Collections.singleton(this.localAddress), (InternalDistributedMember) mbr, reason);
+ msg.setSender((InternalDistributedMember) mbr);
processLeaveRequest(msg);
}
@@ -1494,16 +1529,17 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
/***
* test method
+ *
* @return ViewReplyProcessor
*/
protected ViewReplyProcessor getPrepareViewReplyProcessor() {
return prepareProcessor;
}
-
- protected boolean testPrepareProcessorWaiting(){
+
+ protected boolean testPrepareProcessorWaiting() {
return prepareProcessor.isWaiting();
}
-
+
class ViewReplyProcessor {
volatile int viewId = -1;
final Set<InternalDistributedMember> notRepliedYet = new HashSet<>();
@@ -1526,9 +1562,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
pendingRemovals.clear();
}
- boolean isWaiting(){
+ boolean isWaiting() {
return waiting;
}
+
synchronized void processPendingRequests(Set<InternalDistributedMember> pendingLeaves, Set<InternalDistributedMember> pendingRemovals) {
// there's no point in waiting for members who have already
// requested to leave or who have been declared crashed.
@@ -1588,13 +1625,17 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
- /** call with synchronized(this) */
+ /**
+ * call with synchronized(this)
+ */
private void stopWaitingFor(InternalDistributedMember mbr) {
notRepliedYet.remove(mbr);
checkIfDone();
}
- /** call with synchronized(this) */
+ /**
+ * call with synchronized(this)
+ */
private void checkIfDone() {
if (notRepliedYet.isEmpty() || (pendingRemovals != null && pendingRemovals.containsAll(notRepliedYet))) {
logger.debug("All anticipated view responses received - notifying waiting thread");
@@ -1624,7 +1665,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
} finally {
- synchronized(this) {
+ synchronized (this) {
if (!this.waiting) {
// if we've set waiting to false due to incoming messages then
// we've discounted receiving any other responses from the
@@ -1667,7 +1708,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
sendCurrentView();
}
}
-
+
void sendCurrentView() {
NetView v = currentView;
if (v != null) {
@@ -1682,7 +1723,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
services.getMessenger().sendUnreliably(msg);
}
}
-
+
}
class ViewCreator extends Thread {
@@ -1730,9 +1771,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
* All views should be sent by the ViewCreator thread, so
* if this member becomes coordinator it may have an initial
* view to transmit that announces the removal of the former coordinator to
- *
+ *
* @param newView
- * @param leaving - members leaving in this view
+ * @param leaving - members leaving in this view
* @param removals - members crashed in this view
*/
synchronized void setInitialView(NetView newView, List<InternalDistributedMember> newMembers,
@@ -1756,7 +1797,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
List<InternalDistributedMember> iJoins;
Set<InternalDistributedMember> iLeaves;
Set<InternalDistributedMember> iRemoves;
- synchronized(this) {
+ synchronized (this) {
iView = initialView;
iJoins = initialJoins;
iLeaves = initialLeaving;
@@ -1774,7 +1815,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
* During initial view processing a prepared view was discovered.
* This method will extract its new members and create a new
* initial view containing them.
- *
+ *
* @param v The prepared view
*/
private void processPreparedView(NetView v) {
@@ -1782,7 +1823,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (currentView == null || currentView.getViewId() < v.getViewId()) {
// we have a prepared view that is newer than the current view
// form a new View ID
- int viewId = Math.max(initialView.getViewId(),v.getViewId());
+ int viewId = Math.max(initialView.getViewId(), v.getViewId());
viewId += 1;
NetView newView = new NetView(initialView, viewId);
@@ -1794,13 +1835,13 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
} else {
newMembers = v.getMembers();
}
- for (InternalDistributedMember newMember: newMembers) {
+ for (InternalDistributedMember newMember : newMembers) {
newView.add(newMember);
newView.setFailureDetectionPort(newMember, v.getFailureDetectionPort(newMember));
}
// use the new view as the initial view
- synchronized(this) {
+ synchronized (this) {
setInitialView(newView, newMembers, initialLeaving, initialRemovals);
}
}
@@ -1813,7 +1854,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
sendInitialView();
long okayToCreateView = System.currentTimeMillis() + requestCollectionInterval;
try {
- for (;;) {
+ for (; ; ) {
synchronized (viewRequests) {
if (shutdown) {
return;
@@ -1903,7 +1944,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
InternalDistributedMember mbr = null;
switch (msg.getDSFID()) {
case JOIN_REQUEST:
- JoinRequestMessage jmsg = (JoinRequestMessage)msg;
+ JoinRequestMessage jmsg = (JoinRequestMessage) msg;
mbr = jmsg.getMemberID();
int port = jmsg.getFailureDetectionPort();
// see if an old member ID is being reused. If
@@ -1944,8 +1985,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
removalReqs.add(mbr);
removalReasons.add(((RemoveMemberMessage) msg).getReason());
} else {
- sendRemoveMessages(Collections.<InternalDistributedMember> singletonList(mbr),
- Collections.<String> singletonList(((RemoveMemberMessage) msg).getReason()), currentView);
+ sendRemoveMessages(Collections.<InternalDistributedMember>singletonList(mbr),
+ Collections.<String>singletonList(((RemoveMemberMessage) msg).getReason()), currentView);
}
}
break;
@@ -1981,7 +2022,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
// be reused in an auto-reconnect and get a new vmViewID
mbrs.addAll(joinReqs);
newView = new NetView(localAddress, viewNumber, mbrs, leaveReqs, new HashSet<InternalDistributedMember>(removalReqs));
- for (InternalDistributedMember mbr: joinReqs) {
+ for (InternalDistributedMember mbr : joinReqs) {
if (mbrs.contains(mbr)) {
newView.setFailureDetectionPort(mbr, joinPorts.get(mbr));
}
@@ -2002,14 +2043,14 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
mbr.setVmViewId(newView.getViewId());
mbr.getNetMember().setSplitBrainEnabled(services.getConfig().isNetworkPartitionDetectionEnabled());
}
-
+
if (isShutdown()) {
return;
}
// send removal messages before installing the view so we stop
// getting messages from members that have been kicked out
sendRemoveMessages(removalReqs, removalReasons, newView);
-
+
prepareAndSendView(newView, joinReqs, leaveReqs, newView.getCrashedMembers());
return;
@@ -2080,7 +2121,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
List<InternalDistributedMember> newMembers = conflictingView.getNewMembers();
if (!newMembers.isEmpty()) {
logger.info("adding these new members from a conflicting view to the new view: {}", newMembers);
- for (InternalDistributedMember mbr: newMembers) {
+ for (InternalDistributedMember mbr : newMembers) {
int port = conflictingView.getFailureDetectionPort(mbr);
newView.add(mbr);
newView.setFailureDetectionPort(mbr, port);
@@ -2089,7 +2130,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
// trump the view ID of the conflicting view so mine will be accepted
if (conflictingView.getViewId() >= newView.getViewId()) {
- newView = new NetView(newView, conflictingView.getViewId()+1);
+ newView = new NetView(newView, conflictingView.getViewId() + 1);
}
}
@@ -2107,7 +2148,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
List<InternalDistributedMember> newMembers = new ArrayList<>(newView.getMembers());
newMembers.removeAll(removalReqs);
NetView tempView = new NetView(localAddress, newView.getViewId() + 1, newMembers, leaveReqs, removalReqs);
- for (InternalDistributedMember mbr: newView.getMembers()) {
+ for (InternalDistributedMember mbr : newView.getMembers()) {
if (tempView.contains(mbr)) {
tempView.setFailureDetectionPort(mbr, newView.getFailureDetectionPort(mbr));
}
@@ -2115,12 +2156,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
newView = tempView;
int size = failures.size();
List<String> reasons = new ArrayList<>(size);
- for (int i=0; i<size; i++) {
+ for (int i = 0; i < size; i++) {
reasons.add("Failed to acknowledge a new membership view and then failed tcp/ip connection attempt");
}
sendRemoveMessages(failures, reasons, newView);
}
-
+
// if there is no conflicting view then we can count
// the current state as being prepared. All members
// who are going to ack have already done so or passed
@@ -2128,13 +2169,13 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (conflictingView == null) {
prepared = true;
}
-
+
} while (!prepared);
lastConflictingView = null;
sendView(newView, joinReqs);
-
+
// after sending a final view we need to stop this thread if
// the GMS is shutting down
if (isStopping()) {
@@ -2145,7 +2186,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
/**
* performs health checks on the collection of members, removing any that
* are found to be healthy
- *
+ *
* @param mbrs
*/
private void removeHealthyMembers(Collection<InternalDistributedMember> mbrs) throws InterruptedException {
@@ -2216,7 +2257,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
List<Future<InternalDistributedMember>> futures;
futures = svc.invokeAll(checkers);
long giveUpTime = System.currentTimeMillis() + viewAckTimeout;
- for (Future<InternalDistributedMember> future: futures) {
+ for (Future<InternalDistributedMember> future : futures) {
long now = System.currentTimeMillis();
try {
InternalDistributedMember mbr = null;
@@ -2242,6 +2283,5 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
}
-
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a0759a87/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/ViewRejectMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/ViewRejectMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/ViewRejectMessage.java
new file mode 100755
index 0000000..e5bf9e2
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/ViewRejectMessage.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.distributed.internal.membership.gms.messages;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.distributed.internal.membership.NetView;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class ViewRejectMessage extends HighPriorityDistributionMessage {
+
+ private int viewId;
+ private NetView rejectedView;
+ private String reason;
+
+ public ViewRejectMessage(InternalDistributedMember recipient, int viewId, NetView rejectedView, String reason) {
+ super();
+ setRecipient(recipient);
+ this.viewId = viewId;
+ this.rejectedView = rejectedView;
+ this.reason = reason;
+ }
+
+ public ViewRejectMessage() {
+ // no-arg constructor for serialization
+ }
+
+ public int getViewId() {
+ return viewId;
+ }
+
+ public NetView getRejectedView() {
+ return this.rejectedView;
+ }
+
+
+ @Override
+ public int getDSFID() {
+ // TODO Auto-generated method stub
+ return VIEW_REJECT_MESSAGE;
+ }
+
+ public String getReason() {
+ return reason;
+ }
+
+ @Override
+ public int getProcessorType() {
+ return 0;
+ }
+
+ @Override
+ public void process(DistributionManager dm) {
+ throw new IllegalStateException("this message is not intended to execute in a thread pool");
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ super.toData(out);
+ out.writeInt(this.viewId);
+ DataSerializer.writeObject(this.rejectedView, out);
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ super.fromData(in);
+ this.viewId = in.readInt();
+ this.rejectedView = DataSerializer.readObject(in);
+ }
+
+ @Override
+ public String toString() {
+ String s = getSender() == null? getRecipientsDescription() : ""+getSender();
+ return "ViewRejectMessage("+s+"; "+this.viewId+"; rejectedView="+this.rejectedView +")";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a0759a87/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
index 7b263bf..22ac457 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
@@ -82,7 +82,8 @@ public interface DataSerializableFixedID extends SerializationVersions {
case FOO:
return new FOO(in);
*/
-
+ public static final short VIEW_REJECT_MESSAGE = -158;
+
public static final short NETWORK_PARTITION_MESSAGE = -157;
public static final short SUSPECT_MEMBERS_MESSAGE = -156;