You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/10/26 22:11:42 UTC
[20/42] incubator-geode git commit: GEODE-288: move admin package to
internal
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/096b622d/geode-core/src/main/java/org/apache/geode/internal/admin/api/impl/AdminDistributedSystemImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/admin/api/impl/AdminDistributedSystemImpl.java b/geode-core/src/main/java/org/apache/geode/internal/admin/api/impl/AdminDistributedSystemImpl.java
new file mode 100755
index 0000000..a247822
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/admin/api/impl/AdminDistributedSystemImpl.java
@@ -0,0 +1,2495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.internal.admin.api.impl;
+
+import org.apache.geode.CancelException;
+import org.apache.geode.SystemFailure;
+import org.apache.geode.internal.admin.api.AdminException;
+import org.apache.geode.internal.admin.api.Alert;
+import org.apache.geode.internal.admin.api.AlertLevel;
+import org.apache.geode.internal.admin.api.AlertListener;
+import org.apache.geode.cache.persistence.PersistentID;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.FutureCancelledException;
+import org.apache.geode.distributed.internal.*;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.Banner;
+import org.apache.geode.internal.admin.*;
+import org.apache.geode.internal.admin.api.AdminDistributedSystem;
+import org.apache.geode.internal.admin.api.BackupStatus;
+import org.apache.geode.internal.admin.api.CacheServer;
+import org.apache.geode.internal.admin.api.CacheServerConfig;
+import org.apache.geode.internal.admin.api.CacheVm;
+import org.apache.geode.internal.admin.api.ConfigurationParameter;
+import org.apache.geode.internal.admin.api.DistributedSystemConfig;
+import org.apache.geode.internal.admin.api.DistributionLocator;
+import org.apache.geode.internal.admin.api.DistributionLocatorConfig;
+import org.apache.geode.internal.admin.api.GemFireHealth;
+import org.apache.geode.internal.admin.api.ManagedEntity;
+import org.apache.geode.internal.admin.api.ManagedEntityConfig;
+import org.apache.geode.internal.admin.api.OperationCancelledException;
+import org.apache.geode.internal.admin.api.RuntimeAdminException;
+import org.apache.geode.internal.admin.api.SystemMember;
+import org.apache.geode.internal.admin.api.SystemMemberCacheListener;
+import org.apache.geode.internal.admin.api.SystemMembershipEvent;
+import org.apache.geode.internal.admin.api.SystemMembershipListener;
+import org.apache.geode.internal.admin.remote.*;
+import org.apache.geode.internal.cache.persistence.PersistentMemberPattern;
+import org.apache.geode.internal.i18n.LocalizedStrings;
+import org.apache.geode.internal.logging.InternalLogWriter;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.logging.LogWriterFactory;
+import org.apache.geode.internal.logging.log4j.LocalizedMessage;
+import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.internal.logging.log4j.LogWriterAppender;
+import org.apache.geode.internal.logging.log4j.LogWriterAppenders;
+import org.apache.geode.internal.util.concurrent.FutureResult;
+import org.apache.logging.log4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.geode.distributed.ConfigurationProperties.*;
+
+/**
+ * Represents a GemFire distributed system for remote administration/management.
+ *
+ * @since GemFire 3.5
+ */
+public class AdminDistributedSystemImpl
+implements AdminDistributedSystem,
+ org.apache.geode.internal.admin.JoinLeaveListener,
+ org.apache.geode.internal.admin.AlertListener,
+ org.apache.geode.distributed.internal.InternalDistributedSystem.DisconnectListener {
+
+ private static final Logger logger = LogService.getLogger();
+
+ /** String identity of this distributed system */
+ private String id;
+
+ /** Latest alert broadcast by any system members */
+ private Alert latestAlert;
+
+ // -------------------------------------------------------------------------
+
+ /** Internal admin agent to delegate low-level work to */
+ private volatile GfManagerAgent gfManagerAgent;
+
+ /** Monitors the health of this distributed system */
+ private GemFireHealth health;
+
+ /** Set of non-Manager members in this system */
+ private final Set applicationSet = new HashSet();
+
+ /** Set of DistributionLocators for this system */
+ private final Set locatorSet = new HashSet();
+
+ /** Set of dedicated CacheServer members in this system */
+ private final Set cacheServerSet = new HashSet();
+
+ /** Configuration defining this distributed system */
+ private final DistributedSystemConfigImpl config;
+
+ /** Controller for starting and stopping managed entities */
+ private ManagedEntityController controller;
+
+ /** Log file collator for gathering and merging system member logs */
+ private LogCollator logCollator = new LogCollator();
+
+ /** The level above which alerts will be delivered to the alert
+ * listeners */
+ private AlertLevel alertLevel = AlertLevel.WARNING;
+
+ /** The alert listeners registered on this distributed system. */
+ private volatile Set<AlertListener> alertListeners = Collections.emptySet();
+ private final Object alertLock = new Object();
+
+ private LogWriterAppender logWriterAppender;
+
+ private InternalLogWriter logWriter;
+
+ /** The membership listeners registered on this distributed system */
+ private volatile Set membershipListeners = Collections.EMPTY_SET;
+ private final Object membershipLock = new Object();
+
+ /* The region listeners registered on this distributed system */
+ //for feature requests #32887
+ private volatile List cacheListeners = Collections.EMPTY_LIST;
+ private final Object cacheListLock = new Object();
+
+ /**
+ * reference to AdminDistributedSystemImpl instance
+ * for feature requests #32887.
+ * <p>
+ * Guarded by {@link #CONNECTION_SYNC}.
+ * <p>
+ * TODO: reimplement this change and SystemMemberCacheEventProcessor to avoid
+ * using this static. SystemMemberCacheEvents should only be sent to Admin
+ * VMs that express interest.
+ * <p>
+ * This is volatile to allow SystemFailure to deliver fatal poison-pill
+ * to thisAdminDS without waiting on synchronization.
+ *
+ * @guarded.By CONNECTION_SYNC
+ */
+ private static volatile AdminDistributedSystemImpl thisAdminDS;
+
+ /**
+ * Provides synchronization for {@link #connect()} and {@link #disconnect()}.
+ * {@link #thisAdminDS} is also now protected by CONNECTION_SYNC and has its
+ * lifecycle properly tied to connect/disconnect.
+ */
+ private static final Object CONNECTION_SYNC = new Object();
+
+
+ // -------------------------------------------------------------------------
+ // Constructor(s)
+ // -------------------------------------------------------------------------
+
+ /**
+ * Constructs new DistributedSystemImpl with the given configuration.
+ *
+ * @param config configuration defining this distributed system
+ */
+ public AdminDistributedSystemImpl(DistributedSystemConfigImpl config) {
+
+ // init from config...
+ this.config = config;
+
+ String systemId = this.config.getSystemId();
+ if (systemId != null && systemId.length() > 0) {
+ this.id = systemId;
+
+ } if (this.getLocators() != null && this.getLocators().length() > 0) {
+ this.id = this.getLocators();
+
+ } else {
+ this.id = new StringBuffer(this.getMcastAddress()).append("[").append(
+ this.getMcastPort()).append("]").toString();
+ }
+
+ // LOG: create LogWriterAppender unless one already exists
+ this.logWriterAppender = LogWriterAppenders.getOrCreateAppender(LogWriterAppenders.Identifier.MAIN, false, this.config.createLogConfig(), false);
+
+ // LOG: look in DistributedSystemConfigImpl for existing LogWriter to use
+ InternalLogWriter existingLogWriter = this.config.getInternalLogWriter();
+ if (existingLogWriter != null) {
+ this.logWriter = existingLogWriter;
+ } else {
+ // LOG: create LogWriterLogger
+ this.logWriter = LogWriterFactory.createLogWriterLogger(false, false, this.config.createLogConfig(), false);
+ if (!Boolean.getBoolean(InternalLocator.INHIBIT_DM_BANNER)) {
+ // LOG: changed statement from config to info
+ this.logWriter.info(Banner.getString(null));
+ } else {
+ logger.debug("skipping banner - " + InternalLocator.INHIBIT_DM_BANNER + " is set to true");
+ }
+ // Set this log writer in DistributedSystemConfigImpl
+ this.config.setInternalLogWriter(this.logWriter);
+ }
+
+ // set up other details that depend on config attrs...
+ this.controller = ManagedEntityControllerFactory.createManagedEntityController(this);
+ initializeDistributionLocators();
+ initializeCacheServers();
+ }
+
+ // -------------------------------------------------------------------------
+ // Initialization
+ // -------------------------------------------------------------------------
+
+ /**
+ * Creates DistributionLocator instances for every locator entry in the
+ * {@link DistributedSystemConfig}
+ */
+ private void initializeDistributionLocators() {
+ DistributionLocatorConfig[] configs =
+ this.config.getDistributionLocatorConfigs();
+ if (configs.length == 0) {
+ // No work to do
+ return;
+ }
+
+ for (int i = 0; i < configs.length; i++) {
+ // the Locator impl may vary in this class from the config...
+ DistributionLocatorConfig conf = configs[i];
+ DistributionLocator locator =
+ createDistributionLocatorImpl(conf);
+ this.locatorSet.add(new FutureResult(locator));
+ }
+ // update locators string...
+ setLocators(parseLocatorSet());
+ }
+
+ /**
+ * Creates <code>CacheServer</code> instances for every cache server
+ * entry in the {@link
+ * DistributedSystemConfig}
+ */
+ private void initializeCacheServers() {
+ CacheServerConfig[] cacheServerConfigs =
+ this.config.getCacheServerConfigs();
+ for (int i = 0; i < cacheServerConfigs.length; i++) {
+ try {
+ CacheServerConfig conf = cacheServerConfigs[i];
+ CacheServerConfigImpl copy =
+ new CacheServerConfigImpl(conf);
+ this.cacheServerSet.add(new FutureResult(createCacheServer(copy)));
+ } catch (java.lang.Exception e) {
+ logger.warn(e.getMessage(), e);
+ continue;
+ } catch (VirtualMachineError err) {
+ SystemFailure.initiateFailure(err);
+ // If this ever returns, rethrow the error. We're poisoned
+ // now, so don't let this thread continue.
+ throw err;
+ } catch (java.lang.Error e) {
+ // Whenever you catch Error or Throwable, you must also
+ // catch VirtualMachineError (see above). However, there is
+ // _still_ a possibility that you are dealing with a cascading
+ // error condition, so you also need to check to see if the JVM
+ // is still usable:
+ SystemFailure.checkFailure();
+ logger.error(e.getMessage(), e);
+ continue;
+ }
+ }
+ }
+
+ /**
+ * Checks to make sure that {@link #connect()} has been called.
+ *
+ * @throws IllegalStateException
+ * If {@link #connect()} has not been called.
+ */
+ private void checkConnectCalled() {
+ if (this.gfManagerAgent == null) {
+ throw new IllegalStateException(LocalizedStrings.AdminDistributedSystemImpl_CONNECT_HAS_NOT_BEEN_INVOKED_ON_THIS_ADMINDISTRIBUTEDSYSTEM.toLocalizedString());
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Attributes of this DistributedSystem
+ // -------------------------------------------------------------------------
+
+ public GfManagerAgent getGfManagerAgent() {
+ return this.gfManagerAgent;
+ }
+
+ public boolean isConnected() {
+ return this.gfManagerAgent != null && this.gfManagerAgent.isConnected();
+ }
+
+ public String getId() {
+ return this.id;
+ }
+
+ public String getName() {
+ String name = this.config.getSystemName();
+ if (name != null && name.length() > 0) {
+ return name;
+
+ } else {
+ return getId();
+ }
+ }
+
+ public String getSystemName() {
+ return this.config.getSystemName();
+ }
+
+ public String getRemoteCommand() {
+ return this.config.getRemoteCommand();
+ }
+
+ public void setRemoteCommand(String remoteCommand) {
+ this.config.setRemoteCommand(remoteCommand);
+ }
+
+ public void setAlertLevel(AlertLevel level) {
+ if (this.isConnected()) {
+ this.gfManagerAgent.setAlertLevel(level.getSeverity());
+ }
+
+ this.alertLevel = level;
+ }
+
+ public AlertLevel getAlertLevel() {
+ return this.alertLevel;
+ }
+
+ public void addAlertListener(AlertListener listener) {
+ synchronized (this.alertLock) {
+ Set<AlertListener> oldListeners = this.alertListeners;
+ if (!oldListeners.contains(listener)) {
+ Set<AlertListener> newListeners = new HashSet<AlertListener>(oldListeners);
+ newListeners.add(listener);
+ this.alertListeners = newListeners;
+ }
+ }
+ }
+
+ public int getAlertListenerCount() {
+ synchronized (this.alertLock) {
+ return this.alertListeners.size();
+ }
+ }
+
+ public void removeAlertListener(AlertListener listener) {
+ synchronized (this.alertLock) {
+ Set<AlertListener> oldListeners = this.alertListeners;
+ if (oldListeners.contains(listener)) { // fixed bug 34687
+ Set<AlertListener> newListeners = new HashSet<AlertListener>(oldListeners);
+ if (newListeners.remove(listener)) {
+ this.alertListeners = newListeners;
+ }
+ }
+ }
+ }
+
+ public void addMembershipListener(SystemMembershipListener listener) {
+ synchronized (this.membershipLock) {
+ Set oldListeners = this.membershipListeners;
+ if (!oldListeners.contains(listener)) {
+ Set newListeners = new HashSet(oldListeners);
+ newListeners.add(listener);
+ this.membershipListeners = newListeners;
+ }
+ }
+ }
+
+ public void removeMembershipListener(SystemMembershipListener listener) {
+ synchronized (this.membershipLock) {
+ Set oldListeners = this.membershipListeners;
+ if (oldListeners.contains(listener)) { // fixed bug 34687
+ Set newListeners = new HashSet(oldListeners);
+ if (newListeners.remove(listener)) {
+ this.membershipListeners = newListeners;
+ }
+ }
+ }
+ }
+
+ public String getMcastAddress() {
+ return this.config.getMcastAddress();
+ }
+
+ public int getMcastPort() {
+ return this.config.getMcastPort();
+ }
+
+ public boolean getDisableTcp() {
+ return this.config.getDisableTcp();
+ }
+
+ public boolean getDisableAutoReconnect() {
+ return this.config.getDisableAutoReconnect();
+ }
+
+ public String getLocators() {
+ return this.config.getLocators();
+ }
+
+ protected void setLocators(String locators) {
+ this.config.setLocators(locators);
+ }
+
+ public String getMembershipPortRange() {
+ return this.getConfig().getMembershipPortRange();
+ }
+
+ /** get the direct-channel port to use, or zero if not set */
+ public int getTcpPort() {
+ return this.getConfig().getTcpPort();
+ }
+
+ public void setTcpPort(int port) {
+ this.getConfig().setTcpPort(port);
+ }
+
+ public void setMembershipPortRange(String membershipPortRange) {
+ this.getConfig().setMembershipPortRange(membershipPortRange);
+ }
+
+ public DistributedSystemConfig getConfig() {
+ return this.config;
+ }
+
+ /**
+ * Returns true if any members of this system are currently running.
+ */
+ public boolean isRunning() {
+ if (this.gfManagerAgent == null) return false;
+ // is there a better way??
+ // this.gfManagerAgent.isConnected() ... this.gfManagerAgent.isListening()
+
+ if (isAnyMemberRunning()) return true;
+ return false;
+ }
+
+ /** Returns true if this system can use multicast for communications */
+ public boolean isMcastEnabled() {
+ return this.getMcastPort() > 0 ;
+ }
+
+ ManagedEntityController getEntityController() {
+ return this.controller;
+ }
+
+ static private final String TIMEOUT_MS_NAME
+ = "AdminDistributedSystemImpl.TIMEOUT_MS";
+ static private final int TIMEOUT_MS_DEFAULT = 60000; // 30000 -- see bug36470
+ static private final int TIMEOUT_MS
+ = Integer.getInteger(TIMEOUT_MS_NAME, TIMEOUT_MS_DEFAULT).intValue();
+
+
+ // -------------------------------------------------------------------------
+ // Operations of this DistributedSystem
+ // -------------------------------------------------------------------------
+
+ /**
+ * Starts all managed entities in this system.
+ */
+ public void start() throws AdminException {
+ // Wait for each managed entity to start (see bug 32569)
+ DistributionLocator[] locs = getDistributionLocators();
+ for (int i = 0; i < locs.length; i++) {
+ locs[i].start();
+ }
+ for (int i = 0; i < locs.length; i++) {
+ try {
+ if (!locs[i].waitToStart(TIMEOUT_MS)) {
+ throw new AdminException(LocalizedStrings.AdminDistributedSystemImpl_0_DID_NOT_START_AFTER_1_MS.toLocalizedString(new Object[] {locs[i], Integer.valueOf(TIMEOUT_MS)}));
+ }
+
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new AdminException(LocalizedStrings.AdminDistributedSystemImpl_INTERRUPTED_WHILE_WAITING_FOR_0_TO_START.toLocalizedString(locs[i]), ex);
+ }
+ }
+
+ CacheServer[] servers = getCacheServers();
+ for (int i = 0; i < servers.length; i++) {
+ servers[i].start();
+ }
+ for (int i = 0; i < servers.length; i++) {
+ try {
+ if (!servers[i].waitToStart(TIMEOUT_MS)) {
+ throw new AdminException(LocalizedStrings.AdminDistributedSystemImpl_0_DID_NOT_START_AFTER_1_MS.toLocalizedString(new Object[] {servers[i], Integer.valueOf(TIMEOUT_MS)}));
+ }
+
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new AdminException(LocalizedStrings.AdminDistributedSystemImpl_INTERRUPTED_WHILE_WAITING_FOR_0_TO_START.toLocalizedString(servers[i]), ex);
+ }
+ }
+ }
+
+ /**
+ * Stops all GemFire managers that are members of this system.
+ */
+ public void stop() throws AdminException {
+ // Stop cache server before GemFire managers because the cache
+ // server might host a cache proxy that is dependent on the
+ // manager. See bug 32569.
+
+ // Wait for each managed entity to stop (see bug 32569)
+ long timeout = 30;
+
+ CacheServer[] servers = getCacheServers();
+ for (int i = 0; i < servers.length; i++) {
+ servers[i].stop();
+ }
+ for (int i = 0; i < servers.length; i++) {
+ try {
+ if (!servers[i].waitToStop(timeout * 1000)) {
+ throw new AdminException(LocalizedStrings.AdminDistributedSystemImpl_0_DID_NOT_STOP_AFTER_1_SECONDS.toLocalizedString(new Object[] {servers[i], Long.valueOf(timeout)}));
+ }
+
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new AdminException(LocalizedStrings.AdminDistributedSystemImpl_INTERRUPTED_WHILE_WAITING_FOR_0_TO_STOP.toLocalizedString(servers[i]), ex);
+ }
+ }
+
+ DistributionLocator[] locs = getDistributionLocators();
+ for (int i = 0; i < locs.length; i++) {
+ locs[i].stop();
+ }
+ for (int i = 0; i < locs.length; i++) {
+ try {
+ if (!locs[i].waitToStop(timeout * 1000)) {
+ throw new AdminException(LocalizedStrings.AdminDistributedSystemImpl_0_DID_NOT_STOP_AFTER_1_SECONDS.toLocalizedString(new Object[] {locs[i], Long.valueOf(timeout)}));
+ }
+
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new AdminException(LocalizedStrings.AdminDistributedSystemImpl_INTERRUPTED_WHILE_WAITING_FOR_0_TO_STOP.toLocalizedString(locs[i]), ex);
+ }
+ }
+ }
+
+ /** Display merged system member logs */
+ public String displayMergedLogs() {
+ return this.logCollator.collateLogs(this.gfManagerAgent);
+ }
+
+ /**
+ * Returns the license for this GemFire product; else null if unable to
+ * retrieve license information
+ *
+ * @return license for this GemFire product
+ */
+ public java.util.Properties getLicense() {
+ SystemMember member = findFirstRunningMember();
+ if (member != null) {
+ return new Properties();
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Sets the distribution-related portion of the given managed entity's
+ * configuration so that the entity is part of this distributed system.
+ *
+ * @throws AdminException
+ * TODO-javadocs
+ */
+ private void setDistributionParameters(SystemMember member)
+ throws AdminException {
+
+ Assert.assertTrue(member instanceof ManagedSystemMemberImpl);
+
+ // set some config parms to match this system...
+ ConfigurationParameter[] configParms = new ConfigurationParameter[] {
+ new ConfigurationParameterImpl(
+ MCAST_PORT,
+ Integer.valueOf(this.config.getMcastPort())),
+ new ConfigurationParameterImpl(
+ LOCATORS,
+ this.config.getLocators()),
+ new ConfigurationParameterImpl(
+ MCAST_ADDRESS,
+ InetAddressUtil.toInetAddress(this.config.getMcastAddress())),
+ new ConfigurationParameterImpl(
+ DISABLE_TCP,
+ Boolean.valueOf(this.config.getDisableTcp()) ),
+ };
+ member.setConfiguration(configParms);
+ }
+
+ /**
+ * Handles an <code>ExecutionException</code> by examining its cause
+ * and throwing an appropriate runtime exception.
+ */
+ private static void handle(ExecutionException ex) {
+ Throwable cause = ex.getCause();
+
+ if (cause instanceof OperationCancelledException) {
+ // Operation was cancelled, we don't necessary want to propagate
+ // this up to the user.
+ return;
+ }
+ if (cause instanceof CancelException) { // bug 37285
+ throw new FutureCancelledException(LocalizedStrings.AdminDistributedSystemImpl_FUTURE_CANCELLED_DUE_TO_SHUTDOWN.toLocalizedString(), ex);
+ }
+
+ // Don't just throw the cause because the stack trace can be
+ // misleading. For instance, the cause might have occurred in a
+ // different thread. In addition to the cause, we also want to
+ // know which code was waiting for the Future.
+ throw new RuntimeAdminException(LocalizedStrings.AdminDistributedSystemImpl_WHILE_WAITING_FOR_FUTURE.toLocalizedString(), ex);
+ }
+
+ protected void checkCancellation() {
+ DM dm = this.getDistributionManager();
+ // TODO does dm == null mean we're dead?
+ if (dm != null) {
+ dm.getCancelCriterion().checkCancelInProgress(null);
+ }
+ }
+ /**
+ * Returns a list of manageable SystemMember instances for each
+ * member of this distributed system.
+ *
+ * @return array of system members for each non-manager member
+ */
+ public SystemMember[] getSystemMemberApplications()
+ throws AdminException {
+ synchronized(this.applicationSet) {
+ Collection coll = new ArrayList(this.applicationSet.size());
+ APPS: for (Iterator iter = this.applicationSet.iterator();
+ iter.hasNext(); ) {
+ Future future = (Future) iter.next();
+// this.logger.info("DEBUG: getSystemMemberApplications: " + future);
+ for (;;) {
+ checkCancellation();
+ boolean interrupted = Thread.interrupted();
+ try {
+ coll.add(future.get());
+ break;
+ }
+ catch (InterruptedException ex) {
+ interrupted = true;
+ continue; // keep trying
+ }
+ catch (CancellationException ex) {
+// this.logger.info("DEBUG: cancelled: " + future, ex);
+ continue APPS;
+ }
+ catch (ExecutionException ex) {
+// this.logger.info("DEBUG: executed: " + future);
+ handle(ex);
+ continue APPS;
+ }
+ finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } // for
+ } // APPS
+ SystemMember[] array = new SystemMember[coll.size()];
+ coll.toArray(array);
+ return array;
+ }
+ }
+
+ /**
+ * Display in readable format the latest Alert in this distributed system.
+ *
+ * TODO: create an external admin api object for Alert
+ */
+ public String getLatestAlert() {
+ if (this.latestAlert == null) {
+ return "";
+ }
+ return this.latestAlert.toString();
+ }
+
+ /**
+ * Connects to the currently configured system.
+ */
+ public void connect() {
+ connect(this.logWriter);
+ }
+
+ /**
+ * Connects to the currently configured system. This method is
+ * public for internal use only (testing, for example).
+ *
+ * <p>
+ *
+ * See {@link
+ * org.apache.geode.distributed.DistributedSystem#connect} for a
+ * list of exceptions that may be thrown.
+ *
+ * @param logWriter the InternalLogWriter to use for any logging
+ */
+ public void connect(InternalLogWriter logWriter) {
+ synchronized (CONNECTION_SYNC) {
+ //Check if the gfManagerAgent is NOT null.
+ //If it is already listening, then just return since the connection is already established OR in process.
+ //Otherwise cleanup the state of AdminDistributedSystemImpl. This needs to happen automatically.
+ if(this.gfManagerAgent != null) {
+ if(this.gfManagerAgent.isListening()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("The RemoteGfManagerAgent is already listening for this AdminDistributedSystem.");
+ }
+ return;
+ }
+ this.disconnect();
+ }
+
+ if (thisAdminDS != null) { // TODO: beef up toString and add thisAdminDS
+ throw new IllegalStateException(LocalizedStrings.AdminDistributedSystemImpl_ONLY_ONE_ADMINDISTRIBUTEDSYSTEM_CONNECTION_CAN_BE_MADE_AT_ONCE.toLocalizedString());
+ }
+
+ thisAdminDS = this; //added for feature requests #32887
+
+ if (this.getLocators().length() == 0) {
+ this.id =
+ this.getMcastAddress() + "[" + this.getMcastPort() + "]";
+
+ } else {
+ this.id = this.getLocators();
+ }
+
+ if (this.config instanceof DistributedSystemConfigImpl) {
+ ((DistributedSystemConfigImpl) this.config).validate();
+ ((DistributedSystemConfigImpl) this.config).setDistributedSystem(this);
+ }
+
+ // LOG: passes the AdminDistributedSystemImpl LogWriterLogger into GfManagerAgentConfig for RemoteGfManagerAgent
+ GfManagerAgent agent =
+ GfManagerAgentFactory.getManagerAgent(buildAgentConfig(logWriter));
+ this.gfManagerAgent = agent;
+
+ // sync to prevent bug 33341 Admin API can double-represent system members
+ synchronized(this.membershipListenerLock) {
+ // build the list of applications...
+ ApplicationVM[] apps = this.gfManagerAgent.listApplications();
+ for (int i = 0; i < apps.length; i++) {
+ try {
+ nodeJoined(null, apps[i]);
+ } catch (RuntimeAdminException e) {
+ this.logWriter.warning("encountered a problem processing member " + apps[i]);
+ }
+ }
+ }
+
+ // Build admin objects for all locators (see bug 31959)
+ String locators = this.getLocators();
+ StringTokenizer st = new StringTokenizer(locators, ",");
+ NEXT:
+ while(st.hasMoreTokens()) {
+ String locator = st.nextToken();
+ int first = locator.indexOf("[");
+ int last = locator.indexOf("]");
+ String host = locator.substring(0, first);
+ int colidx = host.lastIndexOf('@');
+ if (colidx < 0) {
+ colidx = host.lastIndexOf(':');
+ }
+ String bindAddr = null;
+ if (colidx > 0 && colidx < (host.length()-1)) {
+ String orig = host;
+ bindAddr = host.substring(colidx+1, host.length());
+ host = host.substring(0, colidx);
+ // if the host contains a colon and there's no '@', we probably
+ // parsed an ipv6 address incorrectly - try again
+ if (host.indexOf(':') >= 0) {
+ int bindidx = orig.lastIndexOf('@');
+ if (bindidx >= 0) {
+ host = orig.substring(0, bindidx);
+ bindAddr = orig.substring(bindidx+1);
+ }
+ else {
+ host = orig;
+ bindAddr = null;
+ }
+ }
+ }
+ int port = Integer.parseInt(locator.substring(first+1, last));
+
+ synchronized (this.locatorSet) {
+ LOCATORS:
+ for (Iterator iter = this.locatorSet.iterator();
+ iter.hasNext(); ) {
+ Future future = (Future) iter.next();
+ DistributionLocatorImpl impl = null;
+ for (;;) {
+ checkCancellation();
+ boolean interrupted = Thread.interrupted();
+ try {
+ impl = (DistributionLocatorImpl) future.get();
+ break; // success
+ }
+ catch (InterruptedException ex) {
+ interrupted = true;
+ continue; // keep trying
+ }
+ catch (CancellationException ex) {
+ continue LOCATORS;
+ }
+ catch (ExecutionException ex) {
+ handle(ex);
+ continue LOCATORS;
+ }
+ finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } // for
+
+ DistributionLocatorConfig conf = impl.getConfig();
+
+ InetAddress host1 = InetAddressUtil.toInetAddress(host);
+ InetAddress host2 =
+ InetAddressUtil.toInetAddress(conf.getHost());
+ if (port == conf.getPort() && host1.equals(host2)) {
+ // Already have an admin object for this locator
+ continue NEXT;
+ }
+ }
+ }
+
+ // None of the existing locators matches the locator in the
+ // string. Contact the locator to get information and create
+ // an admin object for it.
+ InetAddress bindAddress = null;
+ if (bindAddr != null) {
+ bindAddress = InetAddressUtil.toInetAddress(bindAddr);
+ }
+ DistributionLocatorConfig conf =
+ DistributionLocatorConfigImpl.createConfigFor(host, port,
+ bindAddress);
+ if (conf != null) {
+ DistributionLocator impl =
+ createDistributionLocatorImpl(conf);
+ synchronized (this.locatorSet) {
+ this.locatorSet.add(new FutureResult(impl));
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Polls to determine whether or not the connection to the
+ * distributed system has been made.
+ */
+ public boolean waitToBeConnected(long timeout)
+ throws InterruptedException {
+
+ if (Thread.interrupted()) throw new InterruptedException();
+
+ checkConnectCalled();
+
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start < timeout) {
+ if (this.gfManagerAgent.isInitialized()) {
+ return true;
+
+ } else {
+ Thread.sleep(100);
+ }
+ }
+
+ return this.isConnected();
+ }
+
+ /**
+ * Closes all connections and resources to the connected distributed system.
+ *
+ * @see org.apache.geode.distributed.DistributedSystem#disconnect()
+ */
+ public void disconnect() {
+ synchronized (CONNECTION_SYNC) {
+// if (!isConnected()) {
+// throw new IllegalStateException(this + " is not connected");
+// }
+// Assert.assertTrue(thisAdminDS == this);
+ if (this.logWriterAppender != null) {
+ LogWriterAppenders.stop(LogWriterAppenders.Identifier.MAIN);
+ }
+ try {
+ if (thisAdminDS == this) {
+ thisAdminDS = null;
+ }
+ if (this.gfManagerAgent != null && this.gfManagerAgent.isListening()){
+ synchronized (this) {
+ if (this.health != null) {
+ this.health.close();
+ }
+ }
+ this.gfManagerAgent.removeJoinLeaveListener(this);
+ this.gfManagerAgent.disconnect();
+ }
+ this.gfManagerAgent = null;
+ if (this.config instanceof DistributedSystemConfigImpl) {
+ ((DistributedSystemConfigImpl) this.config).setDistributedSystem(null);
+ }
+ } finally {
+ if (logWriterAppender != null) {
+ LogWriterAppenders.destroy(LogWriterAppenders.Identifier.MAIN);
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns the DistributionManager this implementation is using to
+ * connect to the distributed system.
+ */
+ public DM getDistributionManager() {
+ if (this.gfManagerAgent == null) {
+ return null;
+ }
+ return this.gfManagerAgent.getDM();
+
+ }
+
+ /**
+ * Returns the internal admin API's agent used for administering
+ * this <code>AdminDistributedSystem</code>.
+ *
+ * @since GemFire 4.0
+ */
+ public GfManagerAgent getAdminAgent() {
+ return this.gfManagerAgent;
+ }
+
+ /**
+ * Adds a new, unstarted <code>DistributionLocator</code> to this
+ * distributed system.
+ */
+ public DistributionLocator addDistributionLocator() {
+ DistributionLocatorConfig conf =
+ new DistributionLocatorConfigImpl();
+ DistributionLocator locator =
+ createDistributionLocatorImpl(conf);
+ synchronized (this.locatorSet) {
+ this.locatorSet.add(new FutureResult(locator));
+ }
+
+ // update locators string...
+ setLocators(parseLocatorSet());
+ return locator;
+ }
+
+ public DistributionLocator[] getDistributionLocators() {
+ synchronized(this.locatorSet) {
+ Collection coll = new ArrayList(this.locatorSet.size());
+ LOCATORS: for (Iterator iter = this.locatorSet.iterator();
+ iter.hasNext();) {
+ Future future = (Future) iter.next();
+ for (;;) {
+ checkCancellation();
+ boolean interrupted = Thread.interrupted();
+ try {
+ coll.add(future.get());
+ break; // success
+ }
+ catch (InterruptedException ex) {
+ interrupted = true;
+ continue; // keep trying
+ }
+ catch (CancellationException ex) {
+ continue LOCATORS;
+ }
+ catch (ExecutionException ex) {
+ handle(ex);
+ continue LOCATORS;
+ }
+ finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } // for
+ }
+
+ DistributionLocator[] array =
+ new DistributionLocator[coll.size()];
+ coll.toArray(array);
+ return array;
+ }
+ }
+
+ /**
+ * Updates the locator string that is used to discover members of
+ * the distributed system.
+ *
+ * @see #getLocators
+ */
+ void updateLocatorsString() {
+ this.setLocators(parseLocatorSet());
+ }
+
+ protected String parseLocatorSet() {
+ StringBuffer sb = new StringBuffer();
+ LOCATORS: for (Iterator iter = this.locatorSet.iterator(); iter.hasNext();) {
+ Future future = (Future) iter.next();
+ DistributionLocator locator = null;
+ for (;;) {
+ checkCancellation();
+ boolean interrupted = Thread.interrupted();
+ try {
+ locator = (DistributionLocator) future.get();
+ break; // success
+ }
+ catch (InterruptedException ex) {
+ interrupted = true;
+ continue; // keep trying
+ }
+ catch (CancellationException ex) {
+ continue LOCATORS;
+ }
+ catch (ExecutionException ex) {
+ handle(ex);
+ continue LOCATORS;
+ }
+ finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ sb.append(locator.getConfig().getHost());
+ sb.append("[").append(locator.getConfig().getPort()).append("]");
+
+ if (iter.hasNext()) {
+ sb.append(",");
+ }
+ }
+ return sb.toString();
+ }
+
+ // -------------------------------------------------------------------------
+ // Listener callback methods
+ // -------------------------------------------------------------------------
+
+ /** sync to prevent bug 33341 Admin API can double-represent system members */
+ private final Object membershipListenerLock = new Object();
+
+ // --------- org.apache.geode.internal.admin.JoinLeaveListener ---------
+ /**
+ * Listener callback for when a member has joined this DistributedSystem.
+ * <p>
+ * React by adding the SystemMember to this system's
+ * internal lists, if they are not already there. Notice that we
+ * add a {@link Future} into the list so that the admin object is
+ * not initialized while locks are held.
+ *
+ * @param source the distributed system that fired nodeJoined
+ * @param vm the VM that joined
+ * @see org.apache.geode.internal.admin.JoinLeaveListener#nodeJoined
+ */
+ public void nodeJoined(GfManagerAgent source, final GemFireVM vm) {
+ // sync to prevent bug 33341 Admin API can double-represent system members
+ synchronized(this.membershipListenerLock) {
+// this.logger.info("DEBUG: nodeJoined: " + vm.getId(), new RuntimeException("STACK"));
+
+ // does it already exist?
+ SystemMember member = findSystemMember(vm);
+
+ // if not then create it...
+ if (member == null) {
+// this.logger.info("DEBUG: no existing member: " + vm.getId());
+ FutureTask future = null;
+ //try {
+ if (vm instanceof ApplicationVM) {
+ final ApplicationVM app = (ApplicationVM) vm;
+ if (app.isDedicatedCacheServer()) {
+ synchronized (this.cacheServerSet) {
+ future = new AdminFutureTask(vm.getId(), new Callable() {
+ public Object call() throws Exception {
+ logger.info(LogMarker.DM, LocalizedMessage.create(LocalizedStrings.AdminDistributedSystemImpl_ADDING_NEW_CACHESERVER_FOR__0, vm));
+ return createCacheServer(app);
+ }
+ });
+
+ this.cacheServerSet.add(future);
+ }
+
+ } else {
+ synchronized (this.applicationSet) {
+ future = new AdminFutureTask(vm.getId(), new Callable() {
+ public Object call() throws Exception {
+ logger.info(LogMarker.DM, LocalizedMessage.create(LocalizedStrings.AdminDistributedSystemImpl_ADDING_NEW_APPLICATION_FOR__0, vm));
+ return createSystemMember(app);
+ }
+ });
+ this.applicationSet.add(future);
+ }
+ }
+
+ } else {
+ Assert.assertTrue(false, "Unknown GemFireVM type: " +
+ vm.getClass().getName());
+ }
+
+// } catch (AdminException ex) {
+// String s = "Could not create a SystemMember for " + vm;
+// this.logger.warning(s, ex);
+// }
+
+ // Wait for the SystemMember to be created. We want to do this
+ // outside of the "set" locks.
+ future.run();
+ for (;;) {
+ checkCancellation();
+ boolean interrupted = Thread.interrupted();
+ try {
+ member = (SystemMember) future.get();
+ break; // success
+ }
+ catch (InterruptedException ex) {
+ interrupted = true;
+ continue; // keep trying
+ }
+ catch (CancellationException ex) {
+// this.logger.info("DEBUG: run cancelled: " + future, ex);
+ return;
+ }
+ catch (ExecutionException ex) {
+// this.logger.info("DEBUG: run executed: " + future, ex);
+ handle(ex);
+ return;
+ }
+ finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } // for
+
+ Assert.assertTrue(member != null);
+
+ // moved this up into the if that creates a new member to fix bug 34517
+ SystemMembershipEvent event = new SystemMembershipEventImpl(member.getDistributedMember());
+ for (Iterator iter = this.membershipListeners.iterator();
+ iter.hasNext(); ) {
+ SystemMembershipListener listener =
+ (SystemMembershipListener) iter.next();
+ listener.memberJoined(event);
+ }
+// } else {
+// this.logger.info("DEBUG: found existing member: " + member);
+ }
+
+ }
+ }
+
+ /**
+ * Listener callback for when a member of this DistributedSystem has left.
+ * <p>
+ * Reacts by removing the member.
+ *
+ * @param source the distributed system that fired nodeCrashed
+ * @param vm the VM that left
+ * @see org.apache.geode.internal.admin.JoinLeaveListener#nodeLeft
+ */
+ public void nodeLeft(GfManagerAgent source, GemFireVM vm) {
+ // sync to prevent bug 33341 Admin API can double-represent system members
+ synchronized(this.membershipListenerLock) {
+ // member has left...
+ SystemMember member =
+ AdminDistributedSystemImpl.this.removeSystemMember(vm.getId());
+ if (member == null) {
+ return; // reinstated this early-out because removal does not fix 39429
+ }
+
+ // Can't call member.getId() because it is nulled-out when the
+ // SystemMember is removed.
+ SystemMembershipEvent event = new SystemMembershipEventImpl(vm.getId());
+ for (Iterator iter = this.membershipListeners.iterator();
+ iter.hasNext(); ) {
+ SystemMembershipListener listener =
+ (SystemMembershipListener) iter.next();
+ listener.memberLeft(event);
+ }
+ }
+ }
+
+ /**
+ * Listener callback for when a member of this DistributedSystem has crashed.
+ * <p>
+ * Reacts by removing the member.
+ *
+ * @param source the distributed system that fired nodeCrashed
+ * @param vm the VM that crashed
+ * @see org.apache.geode.internal.admin.JoinLeaveListener#nodeCrashed
+ */
+ public void nodeCrashed(GfManagerAgent source, GemFireVM vm) {
+ // sync to prevent bug 33341 Admin API can double-represent system members
+ synchronized(this.membershipListenerLock) {
+ // member has crashed...
+ SystemMember member =
+ AdminDistributedSystemImpl.this.removeSystemMember(vm.getId());
+ if (member == null) {
+ // Unknown member crashed. Hmm...
+ return;
+ }
+
+ // Can't call member.getId() because it is nulled-out when the
+ // SystemMember is removed.
+ SystemMembershipEvent event = new SystemMembershipEventImpl(vm.getId());
+ for (Iterator iter = this.membershipListeners.iterator();
+ iter.hasNext(); ) {
+ SystemMembershipListener listener =
+ (SystemMembershipListener) iter.next();
+ listener.memberCrashed(event);
+ }
+ }
+ }
+
+ // ----------- org.apache.geode.internal.admin.AlertListener -----------
+ /**
+ * Listener callback for when a SystemMember of this DistributedSystem has
+ * crashed.
+ *
+ * @param alert the latest alert from the system
+ * @see org.apache.geode.internal.admin.AlertListener#alert
+ */
+ public void alert(org.apache.geode.internal.admin.Alert alert) {
+ if (AlertLevel.forSeverity(alert.getLevel()).ordinal < alertLevel.ordinal) {
+ return;
+ }
+ Alert alert2 = new AlertImpl(alert);
+ this.latestAlert = alert2;
+ for (Iterator<AlertListener> iter = this.alertListeners.iterator();
+ iter.hasNext(); ) {
+ AlertListener listener = iter.next();
+ listener.alert(alert2);
+ }
+ }
+
+ public void onDisconnect(InternalDistributedSystem sys) {
+ logger.debug("Calling AdminDistributedSystemImpl#onDisconnect");
+ disconnect();
+ logger.debug("Completed AdminDistributedSystemImpl#onDisconnect");
+ }
+
+ // -------------------------------------------------------------------------
+ // Template methods overriden from superclass...
+ // -------------------------------------------------------------------------
+
+ protected CacheServer createCacheServer(ApplicationVM member)
+ throws AdminException {
+
+ return new CacheServerImpl(this, member);
+ }
+
+ protected CacheServer createCacheServer(CacheServerConfigImpl conf)
+ throws AdminException {
+
+ return new CacheServerImpl(this, conf);
+ }
+
+ /** Override createSystemMember by instantiating SystemMemberImpl
+ *
+ * @throws AdminException TODO-javadocs
+ */
+ protected SystemMember createSystemMember(ApplicationVM app)
+ throws AdminException {
+ return new SystemMemberImpl(this, app);
+ }
+
+ /**
+ * Constructs & returns a SystemMember instance using the corresponding
+ * InternalDistributedMember object.
+ *
+ * @param member
+ * InternalDistributedMember instance for which a SystemMember
+ * instance is to be constructed.
+ * @return constructed SystemMember instance
+ * @throws AdminException
+ * if construction of SystemMember instance fails
+ * @since GemFire 6.5
+ */
+ protected SystemMember createSystemMember(InternalDistributedMember member)
+ throws AdminException {
+ return new SystemMemberImpl(this, member);
+ }
+
+ /**
+ * Template-method for creating a new
+ * <code>DistributionLocatorImpl</code> instance.
+ */
+ protected DistributionLocatorImpl
+ createDistributionLocatorImpl(DistributionLocatorConfig conf) {
+ return new DistributionLocatorImpl(conf, this);
+ }
+
+ // -------------------------------------------------------------------------
+ // Non-public implementation methods... TODO: narrow access levels
+ // -------------------------------------------------------------------------
+
+ // TODO: public void connect(...) could stand to have some internals factored out
+
+ /**
+ * Returns List of Locators including Locators or Multicast.
+ *
+ * @return list of locators or multicast values
+ */
+ protected List parseLocators() {
+
+ // assumes host[port] format, delimited by ","
+ List locatorIds = new ArrayList();
+ if (isMcastEnabled()) {
+ String mcastId = new StringBuffer(
+ this.getMcastAddress()).append("[").append(
+ this.getMcastPort()).append("]").toString();
+ locatorIds.add(new DistributionLocatorId(mcastId));
+ }
+ StringTokenizer st = new StringTokenizer(this.getLocators(), ",");
+ while (st.hasMoreTokens()) {
+ locatorIds.add(new DistributionLocatorId(st.nextToken()));
+ }
+
+ if (logger.isDebugEnabled()) {
+ StringBuffer sb = new StringBuffer("Locator set is: ");
+ for (Iterator iter = locatorIds.iterator(); iter.hasNext(); ) {
+ sb.append(iter.next());
+ sb.append(" ");
+ }
+ logger.debug(sb);
+ }
+
+ return locatorIds;
+ }
+
+ /**
+ * Returns whether or not a <code>SystemMember</code> corresponds
+ * to a <code>GemFireVM</code>.
+ *
+ * @param examineConfig
+ * Should we take the configuration of the member into
+ * consideration? In general, we want to consider the
+ * configuration when a member starts up. But when we are
+ * notified that it has shut down, we do not want to examine
+ * the configuration because that might involve contacting
+ * the member. Which, of course, cannot be done because it
+ * has shut down.
+ */
+ private boolean isSame(SystemMemberImpl member, GemFireVM vm,
+ boolean examineConfig) {
+ if (vm.equals(member.getGemFireVM())) {
+ return true;
+ }
+
+ InternalDistributedMember memberId = member.getInternalId();
+ InternalDistributedMember vmId = vm.getId();
+
+ if (vmId.equals(memberId)) {
+ return true;
+ }
+
+ if ((member instanceof ManagedSystemMemberImpl) &&
+ examineConfig) {
+
+ // We can't compare information about managers because the
+ // member might have already gone away. Attempts to send it
+ // messages (to get its product directory, for instance) will
+ // time out.
+
+ ManagedSystemMemberImpl entity =
+ (ManagedSystemMemberImpl) member;
+
+ // Make sure that the type of the managed entity matches the
+ // type of the internal admin object.
+ if (entity instanceof CacheServer) {
+ if (!(vm instanceof ApplicationVM)) {
+ return false;
+ }
+
+ ApplicationVM app = (ApplicationVM) vm;
+ if (!app.isDedicatedCacheServer()) {
+ return false;
+ }
+ }
+
+ ManagedEntityConfig conf = entity.getEntityConfig();
+ InetAddress managedHost =
+ InetAddressUtil.toInetAddress(conf.getHost());
+ File managedWorkingDir = new File(conf.getWorkingDirectory());
+ File managedProdDir = new File(conf.getProductDirectory());
+
+ InetAddress vmHost = vm.getHost();
+ File vmWorkingDir = vm.getWorkingDirectory();
+ File vmProdDir = vm.getGemFireDir();
+
+ if (vmHost.equals(managedHost) &&
+ isSameFile(vmWorkingDir, managedWorkingDir) &&
+ isSameFile(vmProdDir, managedProdDir)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Returns whether or not the names of the two files represent the
+ * same file.
+ */
+ private boolean isSameFile(File file1, File file2) {
+ if (file1.equals(file2)) {
+ return true;
+ }
+
+ if (file1.getAbsoluteFile().equals(file2.getAbsoluteFile())) {
+ return true;
+ }
+
+ try {
+ if (file1.getCanonicalFile().equals(file2.getCanonicalFile())) {
+ return true;
+ }
+
+// StringBuffer sb = new StringBuffer();
+// sb.append("File 1: ");
+// sb.append(file1);
+// sb.append("\nFile 2: ");
+// sb.append(file2);
+// sb.append("\n Absolute 1: ");
+// sb.append(file1.getAbsoluteFile());
+// sb.append("\n Absolute 2: ");
+// sb.append(file2.getAbsoluteFile());
+// sb.append("\n Canonical 1: ");
+// sb.append(file1.getCanonicalFile());
+// sb.append("\n Canonical 2: ");
+// sb.append(file2.getCanonicalFile());
+// logger.info(sb.toString());
+
+ } catch (IOException ex) {
+ // oh well...
+ logger.info(LocalizedMessage.create(LocalizedStrings.AdminDistributedSystemImpl_WHILE_GETTING_CANONICAL_FILE), ex);
+ }
+
+ return false;
+ }
+
+ /**
+ * Finds and returns the <code>SystemMember</code> that corresponds
+ * to the given <code>GemFireVM</code> or <code>null</code> if no
+ * <code>SystemMember</code> corresponds.
+ */
+ protected SystemMember findSystemMember(GemFireVM vm) {
+ return findSystemMember(vm, true);
+ }
+
+ /**
+ * Finds and returns the <code>SystemMember</code> that corresponds to the
+ * given <code>GemFireVM</code> or <code>null</code> if no Finds and returns
+ * the <code>SystemMember</code> that corresponds to the given
+ * <code>GemFireVM</code> or <code>null</code> if no <code>SystemMember</code>
+ * corresponds.
+ *
+ *
+ * @param vm
+ * GemFireVM instance
+ * @param compareConfig
+ * Should the members' configurations be compared? <code>true</code>
+ * when the member has joined, <code>false</code> when the member has
+ * left Should the members' configurations be compared?
+ * <code>true</code> when the member has joined, <code>false</code>
+ * when the member has left. Additionally also used to check if system
+ * member config is to be synchronized with the VM.
+ */
+ protected SystemMember findSystemMember(GemFireVM vm,
+ boolean compareConfig) {
+
+ SystemMemberImpl member = null;
+
+ synchronized (this.cacheServerSet) {
+ SERVERS: for (Iterator iter = this.cacheServerSet.iterator();
+ iter.hasNext(); ) {
+ Future future = (Future) iter.next();
+ CacheServerImpl cacheServer = null;
+ for (;;) {
+ checkCancellation();
+ boolean interrupted = Thread.interrupted();
+ try {
+ cacheServer = (CacheServerImpl) future.get();
+ break; // success
+ }
+ catch (InterruptedException ex) {
+ interrupted = true;
+ continue; // keep trying
+ }
+ catch (CancellationException ex) {
+ continue SERVERS;
+ }
+ catch (ExecutionException ex) {
+ handle(ex);
+ continue SERVERS;
+ }
+ finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } // for
+
+ if (isSame(cacheServer, vm, compareConfig)) {
+ member = cacheServer;
+ break;
+ }
+ }
+ }
+
+ if (member == null) {
+ synchronized (this.applicationSet) {
+ APPS: for (Iterator iter = this.applicationSet.iterator();
+ iter.hasNext(); ) {
+ Future future = (Future) iter.next();
+ SystemMemberImpl application = null;
+ for (;;) {
+ checkCancellation();
+ boolean interrupted = Thread.interrupted();
+ try {
+ application = (SystemMemberImpl) future.get();
+ break; // success
+ }
+ catch (InterruptedException ex) {
+ interrupted = true;
+ continue; // keep trying
+ }
+ catch (CancellationException ex) {
+ continue APPS;
+ }
+ catch (ExecutionException ex) {
+ handle(ex);
+ continue APPS;
+ }
+ finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } // for
+
+ if (isSame(application, vm, compareConfig)) {
+ member = application;
+ break;
+ }
+ } // APPS
+ }
+ }
+
+ if (member != null && compareConfig) {
+ try {
+ member.setGemFireVM(vm);
+
+ } catch (AdminException ex) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.AdminDistributedSystem_COULD_NOT_SET_THE_GEMFIRE_VM), ex);
+ }
+ }
+
+ return member;
+ }
+
+ /**
+ * Removes a SystemMember from this system's list of known members.
+ *
+ * @param systemMember the member to remove
+ * @return the system member that was removed; null if no match was found
+ */
+ protected SystemMember removeSystemMember(SystemMember systemMember) {
+ return removeSystemMember(
+ ((SystemMemberImpl) systemMember).getInternalId());
+ }
+
+ /**
+ * Removes a SystemMember from this system's list of known members. This
+ * method is called in response to a member leaving the system.
+ * TODO: this method is a mess of defns
+ *
+ * @param internalId the unique id that specifies which member to remove
+ * @return the system member that was removed; null if no match was found
+ */
+ protected SystemMember removeSystemMember(InternalDistributedMember internalId) {
+ if (internalId == null) return null;
+
+// this.logger.info("DEBUG: removeSystemMember: " + internalId, new RuntimeException("STACK"));
+
+ boolean found = false;
+ SystemMemberImpl member = null;
+
+ synchronized(this.cacheServerSet) {
+ SERVERS: for (Iterator iter = this.cacheServerSet.iterator();
+ iter.hasNext() && !found; ) {
+ Future future = (Future) iter.next();
+ if (future instanceof AdminFutureTask) {
+ AdminFutureTask task = (AdminFutureTask) future;
+ if (task.getMemberId().equals(internalId)) {
+// this.logger.info("DEBUG: removeSystemMember cs cancelling: " + future);
+ future.cancel(true);
+
+ } else {
+ // This is not the member we are looking for...
+ continue SERVERS;
+ }
+ }
+ for (;;) {
+ checkCancellation();
+ boolean interrupted = Thread.interrupted();
+ try {
+ member = (SystemMemberImpl) future.get();
+ break; // success
+ } catch (InterruptedException ex) {
+ interrupted = true;
+ continue; // keep trying
+ } catch (CancellationException ex) {
+ continue SERVERS;
+ } catch (ExecutionException ex) {
+ handle(ex);
+ return null; // Dead code
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ InternalDistributedMember cacheServerId = member.getInternalId();
+ if (internalId.equals(cacheServerId)) {
+ // found a match...
+ iter.remove();
+ found = true;
+ }
+ } // SERVERS
+ }
+
+ synchronized(this.applicationSet) {
+ for (Iterator iter = this.applicationSet.iterator();
+ iter.hasNext() && !found; ) {
+ Future future = (Future) iter.next();
+ try {
+ if (future instanceof AdminFutureTask) {
+ AdminFutureTask task = (AdminFutureTask) future;
+ if (task.getMemberId().equals(internalId)) {
+ iter.remove(); // Only remove applications
+ found = true;
+ if (future.isDone()) {
+ member = (SystemMemberImpl) future.get();
+ }
+ break;
+ } else {
+ // This is not the member we are looking for...
+ continue;
+ }
+ }
+ if (future.isDone()) {
+ member = (SystemMemberImpl) future.get();
+ } else {
+// this.logger.info("DEBUG: removeSystemMember as cancelling: " + future);
+ future.cancel(true);
+ }
+
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ checkCancellation();
+ throw new RuntimeException(LocalizedStrings.AdminDistributedSystemImpl_INTERRUPTED.toLocalizedString(), ex);
+
+ } catch (CancellationException ex) {
+ continue;
+
+ } catch (ExecutionException ex) {
+ handle(ex);
+ return null; // Dead code
+ }
+
+ InternalDistributedMember applicationId = member.getInternalId();
+ if (internalId.equals(applicationId)) {
+ // found a match...
+ iter.remove(); // Only remove applications
+ found = true;
+ }
+ }
+ }
+
+ if (found) {
+ try {
+ if (member != null) {
+ member.setGemFireVM(null);
+ }
+
+ } catch (AdminException ex) {
+ logger.fatal(LocalizedMessage.create(LocalizedStrings.AdminDistributedSystem_UNEXPECTED_ADMINEXCEPTION), ex);
+ }
+ return member;
+
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Couldn't remove member {}", internalId);
+ }
+ return null;
+ }
+ }
+
+ /**
+ * Builds the configuration needed to connect to a GfManagerAgent which is the
+ * main gateway into the internal.admin api. GfManagerAgent is used to
+ * actually connect to the distributed gemfire system.
+ *
+ * @param logWriter the LogWriterI18n to use for any logging
+ * @return the configuration needed to connect to a GfManagerAgent
+ */
+ // LOG: saves LogWriterLogger from AdminDistributedSystemImpl for RemoteGfManagerAgentConfig
+ private GfManagerAgentConfig buildAgentConfig(InternalLogWriter logWriter) {
+ RemoteTransportConfig conf = new RemoteTransportConfig(
+ isMcastEnabled(), getDisableTcp(),
+ getDisableAutoReconnect(),
+ getBindAddress(), buildSSLConfig(), parseLocators(),
+ getMembershipPortRange(), getTcpPort(), DistributionManager.ADMIN_ONLY_DM_TYPE);
+ return new GfManagerAgentConfig(
+ getSystemName(), conf, logWriter, this.alertLevel.getSeverity(), this, this);
+ }
+
+ protected SSLConfig buildSSLConfig() {
+ SSLConfig conf = new SSLConfig();
+ if (getConfig() != null) {
+ conf.setEnabled(getConfig().isSSLEnabled());
+ conf.setProtocols(getConfig().getSSLProtocols());
+ conf.setCiphers(getConfig().getSSLCiphers());
+ conf.setRequireAuth(getConfig().isSSLAuthenticationRequired());
+ conf.setProperties(getConfig().getSSLProperties());
+ }
+ return conf;
+ }
+
+ /**
+ * Returns the currently configured address to bind to when administering
+ * this system.
+ */
+ private String getBindAddress() {
+ return this.config.getBindAddress();
+ }
+
+ /** Returns whether or not the given member is running */
+ private boolean isRunning(SystemMember member) {
+ if (member instanceof ManagedEntity) {
+ return ((ManagedEntity) member).isRunning();
+
+ } else {
+ // member must be an application VM. It is running
+ return true;
+ }
+ }
+
+ /** Returns any member manager that is known to be running */
+ private SystemMember findFirstRunningMember() {
+ synchronized(this.cacheServerSet) {
+ SERVERS: for (Iterator iter = this.cacheServerSet.iterator();
+ iter.hasNext();){
+ Future future = (Future) iter.next();
+ SystemMember member = null;
+ for (;;) {
+ checkCancellation();
+ boolean interrupted = Thread.interrupted();
+ try {
+ member = (SystemMember) future.get();
+ break; // success
+ }
+ catch (InterruptedException ex) {
+ interrupted = true;
+ continue; // keep trying
+ }
+ catch (CancellationException ex) {
+ continue SERVERS;
+ }
+ catch (ExecutionException ex) {
+ handle(ex);
+ return null; // Dead code
+ }
+ finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } // for
+
+ if (isRunning(member)) {
+ return member;
+ }
+ }
+ }
+
+ synchronized(this.applicationSet) {
+ APPS: for (Iterator iter = this.applicationSet.iterator();
+ iter.hasNext();) {
+ Future future = (Future) iter.next();
+ SystemMember member = null;
+ for (;;) {
+ checkCancellation();
+ boolean interrupted = Thread.interrupted();
+ try {
+ member = (SystemMember) future.get();
+ break; // success
+ }
+ catch (InterruptedException ex) {
+ interrupted = true;
+ continue; // keep trying
+ }
+ catch (CancellationException ex) {
+ continue APPS;
+ }
+ catch (ExecutionException ex) {
+ handle(ex);
+ return null; // Dead code
+ }
+ finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } // for
+
+ if (isRunning(member)) {
+ return member;
+ }
+ } // APPS
+ }
+
+ return null;
+ }
+
+ /**
+ * Returns the instance of system member that is running either as a CacheVm
+ * or only ApplicationVm for the given string representation of the id.
+ *
+ * @param memberId
+ * string representation of the member identifier
+ * @return instance of system member which could be either as a CacheVm or
+ * Application VM
+ */
+ protected SystemMember findCacheOrAppVmById(String memberId) {
+ SystemMember found = null;
+
+ if (memberId != null) {
+ try {
+ boolean foundSender = false;
+ CacheVm[] cacheVms = getCacheVms();
+
+ /* cacheVms could be null. See
+ * AdminDistributedSystemImpl.getCacheVmsCollection() for
+ * ExecutionException */
+ if (cacheVms != null) {
+ for (CacheVm cacheVm : cacheVms) {
+ if (cacheVm.getId().equals(memberId) &&
+ cacheVm instanceof CacheVm) {
+ found = (SystemMember) cacheVm;
+ foundSender = true;
+ break;
+ }
+ }
+ }
+
+ if (!foundSender) {
+ SystemMember[] appVms = getSystemMemberApplications();
+
+ for (SystemMember appVm : appVms) {
+ if (appVm.getId().equals(memberId) &&
+ appVm instanceof SystemMember) {
+ found = (SystemMember) appVm;
+ foundSender = true;
+ break;
+ }
+ }
+
+ }
+ } catch (AdminException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Could not find System Member for member id: {}", memberId, e);
+ }
+ }
+ }
+
+ return found;
+ }
+
+ /** Returns true if any member application is known to be running */
+ protected boolean isAnyMemberRunning() {
+ return findFirstRunningMember() != null;
+ }
+
+ // -------------------------------------------------------------------------
+ // Health methods
+ // -------------------------------------------------------------------------
+
+ /**
+ * Lazily initializes the GemFire health monitor
+ *
+ * @see #createGemFireHealth
+ */
+ public final GemFireHealth getGemFireHealth() {
+ synchronized (this) {
+ if (this.health == null || this.health.isClosed()) {
+ try {
+ this.health = createGemFireHealth(this.gfManagerAgent);
+
+ } catch (AdminException ex) {
+ throw new RuntimeAdminException(LocalizedStrings.AdminDistributedSystemImpl_AN_ADMINEXCEPTION_WAS_THROWN_WHILE_GETTING_THE_GEMFIRE_HEALTH.toLocalizedString(), ex);
+ }
+ }
+
+ return this.health;
+ }
+ }
+
+ /**
+ * A "template factory" method for creating an instance of
+ * <code>GemFireHealth</code>. It can be overridden by subclasses
+ * to produce instances of different <code>GemFireHealth</code>
+ * implementations.
+ *
+ * @see #getGemFireHealth
+ */
+ protected GemFireHealth createGemFireHealth(GfManagerAgent agent)
+ throws AdminException {
+
+ if (agent == null) {
+ throw new IllegalStateException(LocalizedStrings.AdminDistributedSystemImpl_GFMANAGERAGENT_MUST_NOT_BE_NULL.toLocalizedString());
+ }
+ return new GemFireHealthImpl(agent, this);
+ }
+
+ public CacheVm addCacheVm() throws AdminException {
+ return (CacheVm)addCacheServer();
+ }
+
+ public CacheServer addCacheServer() throws AdminException {
+ CacheServerConfigImpl conf = new CacheServerConfigImpl();
+ CacheServer server = createCacheServer(conf);
+ setDistributionParameters(server);
+
+ synchronized (this.cacheServerSet) {
+ this.cacheServerSet.add(new FutureResult(server));
+ }
+
+ return server;
+ }
+
+ private Collection getCacheVmsCollection() throws AdminException {
+ synchronized(this.cacheServerSet) {
+ Collection coll = new ArrayList(this.cacheServerSet.size());
+ SERVERS: for (Iterator iter = this.cacheServerSet.iterator();
+ iter.hasNext(); ) {
+ Future future = (Future) iter.next();
+ Object get = null;
+ for (;;) {
+ checkCancellation();
+ boolean interrupted = Thread.interrupted();
+ try {
+ get = future.get();
+ break; // success
+ }
+ catch (InterruptedException ex) {
+ interrupted = true;
+ continue; // keep trying
+ }
+ catch (CancellationException ex) {
+ continue SERVERS;
+ }
+ catch (ExecutionException ex) {
+ handle(ex);
+ return null; // Dead code
+ }
+ finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } // for
+ coll.add(get);
+ } // SERVERS
+ return coll;
+ }
+ }
+
+ /**
+ * Returns all the cache server members of the distributed system which are
+ * hosting a client queue for the particular durable-client having the given
+ * durableClientId
+ *
+ * @param durableClientId -
+ * durable-id of the client
+ * @return array of CacheServer(s) having the queue for the durable client
+ * @throws AdminException
+ *
+ * @since GemFire 5.6
+ */
+ public CacheServer[] getCacheServers(String durableClientId)
+ throws AdminException
+ {
+ Collection serversForDurableClient = new ArrayList();
+ CacheServer[] servers = getCacheServers();
+
+ for (int i = 0; i < servers.length; i++) {
+ RemoteApplicationVM vm = (RemoteApplicationVM)((CacheServerImpl)servers[i])
+ .getGemFireVM();
+ if (vm != null && vm.hasDurableClient(durableClientId)) {
+ serversForDurableClient.add(servers[i]);
+ }
+ }
+ CacheServer[] array = new CacheServer[serversForDurableClient.size()];
+ serversForDurableClient.toArray(array);
+ return array;
+ }
+
+ public CacheVm[] getCacheVms() throws AdminException {
+ Collection coll = getCacheVmsCollection();
+ if (coll == null) return null;
+ CacheVm[] array = new CacheVm[coll.size()];
+ coll.toArray(array);
+ return array;
+ }
+ public CacheServer[] getCacheServers() throws AdminException {
+ Collection coll = getCacheVmsCollection();
+ if (coll == null) return null;
+ CacheServer[] array = new CacheServer[coll.size()];
+ coll.toArray(array);
+ return array;
+ }
+
+ // -------------------------------------------------------------------------
+ // Overriden java.lang.Object methods
+ // -------------------------------------------------------------------------
+
+ /**
+ * Returns a string representation of the object.
+ *
+ * @return a string representation of the object
+ */
+ @Override // GemStoneAddition
+ public String toString() {
+ return getName();
+ }
+
+ /**
+ * returns instance of AdminDistributedSystem that is current connected. See
+ * <code>thisAdminDS</code>. (for feature requests #32887)
+ * <p>
+ * TODO: remove this static method during reimplementation of
+ * {@link SystemMemberCacheEventProcessor}
+ *
+ * @return AdminDistributedSystem
+ */
+ public static AdminDistributedSystemImpl getConnectedInstance() {
+ synchronized (CONNECTION_SYNC) {
+ return thisAdminDS;
+ }
+ }
+
+ public void addCacheListener(SystemMemberCacheListener listener) {
+ synchronized (this.cacheListLock) {
+ // never modify cacheListeners in place.
+ // this allows iteration without concurrent mod worries
+ List oldListeners = this.cacheListeners;
+ if (!oldListeners.contains(listener)) {
+ List newListeners = new ArrayList(oldListeners);
+ newListeners.add(listener);
+ this.cacheListeners = newListeners;
+ }
+ }
+ }
+
+ public void removeCacheListener(SystemMemberCacheListener listener) {
+ synchronized (this.cacheListLock) {
+ List oldListeners = this.cacheListeners;
+ if (oldListeners.contains(listener)) {
+ List newListeners = new ArrayList(oldListeners);
+ if (newListeners.remove(listener)) {
+ if (newListeners.isEmpty()) {
+ newListeners = Collections.EMPTY_LIST;
+ }
+ this.cacheListeners = newListeners;
+ }
+ }
+ }
+ }
+
+ public List getCacheListeners() {
+ return this.cacheListeners;
+ }
+
+ public SystemMember lookupSystemMember(DistributedMember distributedMember)
+ throws AdminException {
+ if (distributedMember == null) return null;
+ SystemMember[] members = getSystemMemberApplications();
+ for (int i = 0; i < members.length; i++) {
+ if (distributedMember.equals(members[i].getDistributedMember())) {
+ return members[i];
+ }
+ }
+ return null;
+ }
+
+ //////////////////////// Inner Classes ////////////////////////
+
+ /**
+ * Object that converts an <code>internal.admin.Alert</code> into an
+ * external <code>admin.Alert</code>.
+ */
+ public class AlertImpl implements Alert {
+ /** The Alert to which most behavior is delegated */
+ private final org.apache.geode.internal.admin.Alert alert;
+ private SystemMember systemMember;
+
+ /////////////////////// Constructors ///////////////////////
+
+ /**
+ * Creates a new <code>Alert</code> that delegates to the given
+ * object.
+ */
+ AlertImpl(org.apache.geode.internal.admin.Alert alert) {
+ this.alert = alert;
+ GemFireVM vm = alert.getGemFireVM();
+
+ /*
+ * Related to #39657.
+ * Avoid setting GemFireVM again in the system member.
+ * Eager initialization of member variable - systemMember.
+ */
+ this.systemMember = vm == null ? null : findSystemMember(vm, false);
+ if (this.systemMember == null) {
+ /*
+ * try to use sender information to construct the SystemMember that can
+ * be used for disply purpose at least
+ */
+ InternalDistributedMember sender = alert.getSender();
+ if (sender != null) {
+ try {
+ this.systemMember =
+ AdminDistributedSystemImpl.this.createSystemMember(sender);
+ } catch (AdminException e) {
+ /*
+ * AdminException might be thrown if creation of System Member
+ * instance fails.
+ */
+ this.systemMember = null;
+ }
+ } //else this.systemMember will be null
+ }
+ }
+
+ ////////////////////// Instance Methods //////////////////////
+
+ public AlertLevel getLevel() {
+ return AlertLevel.forSeverity(alert.getLevel());
+ }
+
+ /*
+ * Eager initialization of system member is done while creating this alert
+ * only.
+ */
+ public SystemMember getSystemMember() {
+ return systemMember;
+ }
+
+ public String getConnectionName() {
+ return alert.getConnectionName();
+ }
+
+ public String getSourceId() {
+ return alert.getSourceId();
+ }
+
+ public String getMessage() {
+ return alert.getMessage();
+ }
+
+ public java.util.Date getDate() {
+ return alert.getDate();
+ }
+
+ @Override
+ public String toString() {
+ return alert.toString();
+ }
+ }
+
+ /**
+ * A JSR-166 <code>FutureTask</code> whose {@link #get} method
+ * properly handles an <code>ExecutionException</code> that wraps an
+ * <code>InterruptedException</code>. This is necessary because
+ * there are places in the admin API that wrap
+ * <code>InterruptedException</code>s. See bug 32634.
+ *
+ * <P>
+ *
+ * This is by no means an ideal solution to this problem. It would
+ * be better to modify the code invoked by the <code>Callable</code>
+ * to explicitly throw <code>InterruptedException</code>.
+ */
+ static class AdminFutureTask extends FutureTask {
+
+ /** The id of the member whose admin object we are creating.
+ * Keeping track of this allows us to cancel a FutureTask for a
+ * member that has gone away. */
+ private final InternalDistributedMember memberId;
+
+ public AdminFutureTask(InternalDistributedMember memberId,
+ Callable callable) {
+ super(callable);
+ this.memberId = memberId;
+ }
+
+ /**
+ * Returns the id of the member of the distributed system for
+ * which this <code>FutureTask</code> is doing work.
+ */
+ public InternalDistributedMember getMemberId() {
+ return this.memberId;
+ }
+
+ /**
+ * If the <code>ExecutionException</code> is caused by an
+ * <code>InterruptedException</code>, throw the
+ * <code>CancellationException</code> instead.
+ */
+ @Override
+ public Object get()
+ throws InterruptedException, ExecutionException {
+
+ if (Thread.interrupted()) throw new InterruptedException();
+ try {
+ return super.get();
+
+ } catch (ExecutionException ex) {
+ for (Throwable cause = ex.getCause(); cause != null;
+ cause = cause.getCause()) {
+ if (cause instanceof InterruptedException) {
+ // We interrupted the runnable but we don't want the thread
+ // that called get to think he was interrupted.
+ CancellationException ex2 = new CancellationException(LocalizedStrings.AdminDistributedSystemImpl_BY_INTERRUPT.toLocalizedString());
+ ex2.setStackTrace(cause.getStackTrace());
+ throw ex2;
+ }
+ }
+
+ throw ex;
+ }
+
+ }
+
+ }
+
+ public DistributedMember getDistributedMember() {
+ return getDistributionManager().getId();
+ }
+
+ private void connectAdminDS() {
+ connect((InternalLogWriter)this.logWriter);
+ try {
+ thisAdminDS.waitToBeConnected(3000);
+ } catch (InterruptedException ie) {
+ logger.warn("Interrupted while waiting to connect", ie);
+ }
+ }
+
+ public Set<PersistentID> getMissingPersistentMembers()
+ throws AdminException {
+ connectAdminDS();
+ DM dm = getDistributionManager();
+ if(dm == null) {
+ throw new IllegalStateException(LocalizedStrings.AdminDistributedSystemImpl_CONNECT_HAS_NOT_BEEN_INVOKED_ON_THIS_ADMINDISTRIBUTEDSYSTEM.toLocalizedString());
+ }
+ return getMissingPersistentMembers(dm);
+ }
+
+ public static Set<PersistentID> getMissingPersistentMembers(DM dm) {
+ return MissingPersistentIDsRequest.send(dm);
+ }
+
+ public void revokePersistentMember(InetAddress host,
+ String directory) throws AdminException {
+ connectAdminDS();
+ DM dm = getDistributionManager();
+ if(dm == null) {
+ throw new IllegalStateException(LocalizedStrings.AdminDistributedSystemImpl_CONNECT_HAS_NOT_BEEN_INVOKED_ON_THIS_ADMINDISTRIBUTEDSYSTEM.toLocalizedString());
+ }
+ revokePersistentMember(dm, host, directory);
+
+ }
+
+ public void revokePersistentMember(UUID diskStoreID) throws AdminException {
+ connectAdminDS();
+ DM dm = getDistributionManager();
+ if(dm == null) {
+ throw new IllegalStateException(LocalizedStrings.AdminDistributedSystemImpl_CONNECT_HAS_NOT_BEEN_INVOKED_ON_THIS_ADMINDISTRIBUTEDSYSTEM.toLocalizedString());
+ }
+ revokePersistentMember(dm, diskStoreID);
+
+ }
+
+ public static void revokePersistentMember(DM dm, UUID diskStoreID) {
+ PersistentMemberPattern pattern = new PersistentMemberPattern(diskStoreID);
+ boolean success = false;
+ try {
+ // make sure that the disk store we're revoking is actually missing
+ boolean found = false;
+ Set<PersistentID> details = getMissingPersistentMembers(dm);
+ if (details != null) {
+ for (PersistentID id : details) {
+ if (id.getUUID().equals(diskStoreID)) {
+ found = true;
+ break;
+ }
+ }
+ }
+ if (!found) {
+ return;
+ }
+
+ //Fix for 42607 - verify that the persistent id is not already
+ //running before revoking it.
+ PrepareRevokePersistentIDRequest.send(dm, pattern);
+ success = true;
+ } finally {
+ if(success) {
+ //revoke the persistent member if were able to prepare the revoke
+ RevokePersistentIDRequest.send(dm, pattern);
+ } else {
+ //otherwise, cancel the revoke.
+ PrepareRevokePersistentIDRequest.cancel(dm, pattern);
+ }
+ }
+ }
+
+ /**
+ *
+ * @deprecated use {@link #revokePersistentMember(UUID)} instead
+ */
+ public static void revokePersistentMember(DM dm, InetAddress host, String directory) {
+
+ PersistentMemberPattern pattern = new PersistentMemberPattern(host, directory, System.currentTimeMillis());
+ boolean success = false;
+ try {
+ //Fix for 42607 - verify that the persistent id is not already
+ //running before revoking it.
+ PrepareRevokePersistentIDRequest.send(dm, pattern);
+ success = true;
+ } finally {
+ if(success) {
+ //revoke the persistent member if were able to prepare the revoke
+ RevokePersistentIDRequest.send(dm, pattern);
+ } else {
+ //otherwise, cancel the revoke.
+ PrepareRevokePersistentIDRequest.cancel(dm, pattern);
+ }
+ }
+ }
+
+ public Set shutDownAllMembers() throws AdminException {
+ return shutDownAllMembers(0);
+ }
+
+ public Set shutDownAllMembers(long timeout) throws AdminException {
+ connectAdminDS();
+ DM dm = getDistributionManager();
+ if(dm == null) {
+ throw new IllegalStateException(LocalizedStrings.AdminDistributedSystemImpl_CONNECT_HAS_NOT_BEEN_INVOKED_ON_THIS_ADMINDISTRIBUTEDSYSTEM.toLocalizedString());
+ }
+ return shutDownAllMembers(dm, timeout);
+ }
+
+ /**
+ * Shutdown all members.
+ * @param dm
+ * @param timeout the amount of time (in ms) to spending trying to shutdown the members
+ * gracefully. After this time period, the members will be forceable shut down. If the
+ * timeout is exceeded, persistent recovery after the shutdown may need to do a GII. -1
+ * indicates that the shutdown should wait forever.
+ */
+ public static Set shutDownAllMembers(DM dm, long timeout) {
+ return ShutdownAllRequest.send(dm, timeout);
+ }
+
+ public BackupStatus backupAllMembers(File targetDir) throws AdminException {
+ return backupAllMembers(targetDir, null);
+ }
+
+ public BackupStatus backupAllMembers(File targetDir, File baselineDir) throws AdminException {
+ connectAdminDS();
+ DM dm = getDistributionManager();
+ if(dm == null) {
+ throw new IllegalStateException(LocalizedStrings.AdminDistributedSystemImpl_CONNECT_HAS_NOT_BEEN_INVOKED_ON_THIS_ADMINDISTRIBUTEDSYSTEM.toLocalizedString());
+ }
+ return backupAllMembers(dm, targetDir, baselineDir);
+ }
+
+ public static BackupStatus backupAllMembers(DM dm, File targetDir, File baselineDir)
+ throws AdminException {
+ BackupStatus status = null;
+ if (BackupDataStoreHelper.obtainLock(dm)) {
+ try {
+ Set<PersistentID> missingMembers = getMissingPersistentMembers(dm);
+ Set recipients = dm.getOtherDistributionManagerIds();
+
+ SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
+ targetDir = new File(targetDir, format.format(new Date()));
+ BackupDataStoreResult result = BackupDataStoreHelper.backupAllMembers(
+ dm, recipients, targetDir, baselineDir);
+
+ // It's possible that when calling getMissingPersistentMembers, some members are
+ // still creating/recovering regions, and at FinishBackupRequest.send, the
+ // regions at the members are ready. Logically, since the members in successfulMembers
+ // should override the previous missingMembers
+ for(Set<PersistentID> onlineMembersIds : result.getSuccessfulMembers().values()) {
+ missingMembers.removeAll(onlineMembersIds);
+ }
+
+ result.getExistingDataStores().keySet().removeAll(result.getSuccessfulMembers().keySet());
+ for(Set<PersistentID> lostMembersIds : result.getExistingDataStores().values()) {
+ missingMembers.addAll(lostMembersIds);
+ }
+
+ status = new BackupStatusImpl(result.getSuccessfulMembers(), missingMembers);
+ } finally {
+ BackupDataStoreHelper.releaseLock(dm);
+ }
+ } else {
+ throw new AdminException(LocalizedStrings.DistributedSystem_BACKUP_ALREADY_IN_PROGRESS.toLocalizedString());
+ }
+ return status;
+ }
+
+ public Map<DistributedMember, Set<PersistentID>> compactAllDiskStores() throws AdminException {
+ connectAdminDS();
+ DM dm = getDistributionManager();
+ if(dm == null) {
+ throw new IllegalStateException(LocalizedStrings.AdminDistributedSystemImpl_CONNECT_HAS_NOT_BEEN_INVOKED_ON_THIS_ADMINDISTRIBUTEDSYSTEM.toLocalizedString());
+ }
+ return compactAllDiskStores(dm);
+ }
+
+ public static Map<DistributedMember, Set<PersistentID>> compactAllDiskStores(DM dm)
+ throws AdminException {
+ return CompactRequest.send(dm);
+ }
+
+ /**
+ * This method can be used to process ClientMembership events sent for
+ * BridgeMembership by bridge servers to all admin members.
+ *
+ * NOTE: Not implemented currently. JMX implementation which is a subclass of
+ * this class i.e. AdminDistributedSystemJmxImpl implements it.
+ *
+ * @param senderId
+ * id of the member that sent the ClientMembership changes for
+ * processing (could be null)
+ * @param clientId
+ * id of a client for which the notification was sent
+ * @param clientHost
+ * host on which the client is/was running
+ * @param eventType
+ * denotes whether the client Joined/Left/Crashed should be one of
+ * ClientMembershipMessage#JOINED, ClientMembershipMessage#LEFT,
+ * ClientMembershipMessage#CRASHED
+ */
+ public void processClientMembership(String senderId, String clientId,
+ String clientHost, int eventType) {
+ }
+
+ public void setAlertLevelAsString(String level) {
+ AlertLevel newAlertLevel = AlertLevel.forName(level);
+
+ if (newAlertLevel != null) {
+ setAlertLevel(newAlertLevel);
+ } else {
+ System.out.println("ERROR:: "+level+" is invalid. Allowed alert levels are: WARNING, ERROR, SEVERE, OFF");
+ throw new IllegalArgumentException(LocalizedStrings.DEBUG.toLocalizedString(level+" is invalid. Allowed alert levels are: WARNING, ERROR, SEVERE, OFF"));
+ }
+ }
+
+ public String getAlertLevelAsString() {
+ return getAlertLevel().getName();
+ }
+}
+