You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/20 00:59:52 UTC
[34/51] [abbrv] [partial] incubator-geode git commit: Merge
remote-tracking branch 'origin/develop' into feature/GEODE-917
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerImpl.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerImpl.java
index cd4f3e4,0000000..000120b
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerImpl.java
@@@ -1,821 -1,0 +1,832 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelCriterion;
+import com.gemstone.gemfire.GemFireIOException;
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.InvalidValueException;
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.ClientSession;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.DiskStore;
+import com.gemstone.gemfire.cache.DiskStoreFactory;
+import com.gemstone.gemfire.cache.DynamicRegionFactory;
+import com.gemstone.gemfire.cache.EvictionAction;
+import com.gemstone.gemfire.cache.ExpirationAction;
+import com.gemstone.gemfire.cache.ExpirationAttributes;
+import com.gemstone.gemfire.cache.InterestRegistrationListener;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.RegionExistsException;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache.server.ClientSubscriptionConfig;
+import com.gemstone.gemfire.cache.server.ServerLoadProbe;
+import com.gemstone.gemfire.cache.server.internal.LoadMonitor;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.DM;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisee;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ResourceEvent;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.distributed.internal.membership.MemberAttributes;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.OSProcess;
+import com.gemstone.gemfire.internal.admin.ClientHealthMonitoringRegion;
+import com.gemstone.gemfire.internal.cache.CacheServerAdvisor.CacheServerProfile;
+import com.gemstone.gemfire.internal.cache.ha.HARegionQueue;
+import com.gemstone.gemfire.internal.cache.tier.Acceptor;
+import com.gemstone.gemfire.internal.cache.tier.sockets.AcceptorImpl;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.management.membership.ClientMembership;
+import com.gemstone.gemfire.management.membership.ClientMembershipListener;
+
+/**
+ * An implementation of the <code>CacheServer</code> interface that delegates
+ * most of the heavy lifting to an {@link Acceptor}.
+ *
+ * @author David Whitlock
+ * @since 4.0
+ */
+@SuppressWarnings("deprecation")
+public class CacheServerImpl
+ extends AbstractCacheServer
+ implements DistributionAdvisee {
+
+ private static final Logger logger = LogService.getLogger();
+
+ private static final int FORCE_LOAD_UPDATE_FREQUENCY= Integer.getInteger("gemfire.BridgeServer.FORCE_LOAD_UPDATE_FREQUENCY", 10).intValue();
+
+ /** The acceptor that does the actual serving */
+ private volatile AcceptorImpl acceptor;
+
+ /**
+ * The advisor used by this cache server.
+ * @since 5.7
+ */
+ private volatile CacheServerAdvisor advisor;
+
+ /**
+ * The monitor used to monitor load on this
+ * bridge server and distribute load to the locators
+ * @since 5.7
+ */
+ private volatile LoadMonitor loadMonitor;
+
+ /**
+ * boolean that represents whether this server is a GatewayReceiver or a simple BridgeServer
+ */
+ private boolean isGatewayReceiver;
+
+ private List<GatewayTransportFilter> gatewayTransportFilters = Collections.EMPTY_LIST;
+
++ /** is this a server created by a launcher as opposed to by an application or XML? */
++ private boolean isDefaultServer;
++
+ /**
+ * Needed because this guy is an advisee
+ * @since 5.7
+ */
+ private int serialNumber; // changed on each start
+
+ public static final boolean ENABLE_NOTIFY_BY_SUBSCRIPTION_FALSE =
+ Boolean.getBoolean("gemfire.cache-server.enable-notify-by-subscription-false");
+
+
+ // ////////////////////// Constructors //////////////////////
+
+ /**
+ * Creates a new <code>BridgeServerImpl</code> that serves the contents of
+ * the give <code>Cache</code>. It has the default configuration.
+ */
+ public CacheServerImpl(GemFireCacheImpl cache, boolean isGatewayReceiver) {
+ super(cache);
+ this.isGatewayReceiver = isGatewayReceiver;
+ }
+
+ // //////////////////// Instance Methods ///////////////////
+
+ public CancelCriterion getCancelCriterion() {
+ return cache.getCancelCriterion();
+ }
+
+ /**
+ * Checks to see whether or not this bridge server is running. If so, an
+ * {@link IllegalStateException} is thrown.
+ */
+ private void checkRunning() {
+ if (this.isRunning()) {
+ throw new IllegalStateException(LocalizedStrings.CacheServerImpl_A_CACHE_SERVERS_CONFIGURATION_CANNOT_BE_CHANGED_ONCE_IT_IS_RUNNING.toLocalizedString());
+ }
+ }
+
+ public boolean isGatewayReceiver() {
+ return this.isGatewayReceiver;
+ }
+
+ @Override
+ public int getPort() {
+ if (this.acceptor != null) {
+ return this.acceptor.getPort();
+ }
+ else {
+ return super.getPort();
+ }
+ }
+
+ @Override
+ public void setPort(int port) {
+ checkRunning();
+ super.setPort(port);
+ }
+
+ @Override
+ public void setBindAddress(String address) {
+ checkRunning();
+ super.setBindAddress(address);
+ }
+ @Override
+ public void setHostnameForClients(String name) {
+ checkRunning();
+ super.setHostnameForClients(name);
+ }
+
+ @Override
+ public void setMaxConnections(int maxCon) {
+ checkRunning();
+ super.setMaxConnections(maxCon);
+ }
+
+ @Override
+ public void setMaxThreads(int maxThreads) {
+ checkRunning();
+ super.setMaxThreads(maxThreads);
+ }
+
+ @Override
+ public void setNotifyBySubscription(boolean b) {
+ checkRunning();
+ if (CacheServerImpl.ENABLE_NOTIFY_BY_SUBSCRIPTION_FALSE) {
+ this.notifyBySubscription = b;
+ }
+ }
+
+ @Override
+ public void setMaximumMessageCount(int maximumMessageCount) {
+ checkRunning();
+ super.setMaximumMessageCount(maximumMessageCount);
+ }
+
+ @Override
+ public void setSocketBufferSize(int socketBufferSize) {
+ this.socketBufferSize = socketBufferSize;
+ }
+
+ @Override
+ public int getSocketBufferSize() {
+ return this.socketBufferSize;
+ }
+
+ @Override
+ public void setMaximumTimeBetweenPings(int maximumTimeBetweenPings) {
+ this.maximumTimeBetweenPings = maximumTimeBetweenPings;
+ }
+
+ @Override
+ public int getMaximumTimeBetweenPings() {
+ return this.maximumTimeBetweenPings;
+ }
+
+
+ @Override
+ public void setLoadPollInterval(long loadPollInterval) {
+ checkRunning();
+ super.setLoadPollInterval(loadPollInterval);
+ }
+
+ @Override
+ public int getMaximumMessageCount() {
+ return this.maximumMessageCount;
+ }
+
+ @Override
+ public void setLoadProbe(ServerLoadProbe loadProbe) {
+ checkRunning();
+ super.setLoadProbe(loadProbe);
+ }
+
+ public void setGatewayTransportFilter(
+ List<GatewayTransportFilter> transportFilters) {
+ this.gatewayTransportFilters = transportFilters;
+ }
+
+ @Override
+ public int getMessageTimeToLive() {
+ return this.messageTimeToLive;
+ }
+
+
+ public ClientSubscriptionConfig getClientSubscriptionConfig(){
+ return this.clientSubscriptionConfig;
+ }
+
++ public boolean isDefaultServer() {
++ return isDefaultServer;
++ }
++
++ public void setIsDefaultServer() {
++ this.isDefaultServer = true;
++ }
++
+ /**
+ * Sets the configuration of <b>this</b> <code>CacheServer</code> based on
+ * the configuration of <b>another</b> <code>CacheServer</code>.
+ */
+ public void configureFrom(CacheServer other) {
+ setPort(other.getPort());
+ setBindAddress(other.getBindAddress());
+ setHostnameForClients(other.getHostnameForClients());
+ setMaxConnections(other.getMaxConnections());
+ setMaxThreads(other.getMaxThreads());
+ setNotifyBySubscription(other.getNotifyBySubscription());
+ setSocketBufferSize(other.getSocketBufferSize());
+ setTcpNoDelay(other.getTcpNoDelay());
+ setMaximumTimeBetweenPings(other.getMaximumTimeBetweenPings());
+ setMaximumMessageCount(other.getMaximumMessageCount());
+ setMessageTimeToLive(other.getMessageTimeToLive());
+// setTransactionTimeToLive(other.getTransactionTimeToLive()); not implemented in CacheServer for v6.6
+ setGroups(other.getGroups());
+ setLoadProbe(other.getLoadProbe());
+ setLoadPollInterval(other.getLoadPollInterval());
+ ClientSubscriptionConfig cscOther = other.getClientSubscriptionConfig();
+ ClientSubscriptionConfig cscThis = this.getClientSubscriptionConfig();
+ // added for configuration of ha overflow
+ cscThis.setEvictionPolicy(cscOther.getEvictionPolicy());
+ cscThis.setCapacity(cscOther.getCapacity());
+ String diskStoreName = cscOther.getDiskStoreName();
+ if (diskStoreName != null) {
+ cscThis.setDiskStoreName(diskStoreName);
+ } else {
+ cscThis.setOverflowDirectory(cscOther.getOverflowDirectory());
+ }
+ }
+
+ @Override
+ public synchronized void start() throws IOException {
+ Assert.assertTrue(this.cache != null);
+ boolean isSqlFabricSystem = ((GemFireCacheImpl)this.cache).isSqlfSystem();
+
+ this.serialNumber = createSerialNumber();
+ if (DynamicRegionFactory.get().isOpen()) {
+ // force notifyBySubscription to be true so that meta info is pushed
+ // from servers to clients instead of invalidates.
+ if (!this.notifyBySubscription) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.CacheServerImpl_FORCING_NOTIFYBYSUBSCRIPTION_TO_SUPPORT_DYNAMIC_REGIONS));
+ this.notifyBySubscription = true;
+ }
+ }
+ this.advisor = CacheServerAdvisor.createCacheServerAdvisor(this);
+ this.loadMonitor = new LoadMonitor(loadProbe, maxConnections,
+ loadPollInterval, FORCE_LOAD_UPDATE_FREQUENCY,
+ advisor);
+ List overflowAttributesList = new LinkedList();
+ ClientSubscriptionConfig csc = this.getClientSubscriptionConfig();
+ overflowAttributesList.add(0, csc.getEvictionPolicy());
+ overflowAttributesList.add(1, Integer.valueOf(csc.getCapacity()));
+ overflowAttributesList.add(2, Integer.valueOf(this.port));
+ String diskStoreName = csc.getDiskStoreName();
+ if (diskStoreName != null) {
+ overflowAttributesList.add(3, diskStoreName);
+ overflowAttributesList.add(4, true); // indicator to use diskstore
+ } else {
+ overflowAttributesList.add(3, csc.getOverflowDirectory());
+ overflowAttributesList.add(4, false);
+ }
+
+ this.acceptor = new AcceptorImpl(getPort(),
+ getBindAddress(),
+ getNotifyBySubscription(),
+ getSocketBufferSize(),
+ getMaximumTimeBetweenPings(),
+ this.cache,
+ getMaxConnections(),
+ getMaxThreads(),
+ getMaximumMessageCount(),
+ getMessageTimeToLive(),
+ getTransactionTimeToLive(),
+ this.loadMonitor,
+ overflowAttributesList,
+ isSqlFabricSystem,
+ this.isGatewayReceiver,
+ this.gatewayTransportFilters, this.tcpNoDelay);
+
+ this.acceptor.start();
+ this.advisor.handshake();
+ this.loadMonitor.start(new ServerLocation(getExternalAddress(),
+ getPort()), acceptor.getStats());
+
+ // TODO : Need to provide facility to enable/disable client health monitoring.
+ //Creating ClientHealthMonitoring region.
+ // Force initialization on current cache
+ if(cache instanceof GemFireCacheImpl) {
+ ClientHealthMonitoringRegion.getInstance((GemFireCacheImpl)cache);
+ }
+ this.cache.getLoggerI18n().config(LocalizedStrings.CacheServerImpl_CACHESERVER_CONFIGURATION___0, getConfig());
+
+ /*
+ * If the stopped bridge server is restarted, we'll need to re-register the
+ * client membership listener. If the listener is already registered it
+ * won't be registered as would the case when start() is invoked for the
+ * first time.
+ */
+ ClientMembershipListener[] membershipListeners =
+ ClientMembership.getClientMembershipListeners();
+
+ boolean membershipListenerRegistered = false;
+ for (ClientMembershipListener membershipListener : membershipListeners) {
+ //just checking by reference as the listener instance is final
+ if (listener == membershipListener) {
+ membershipListenerRegistered = true;
+ break;
+ }
+ }
+
+ if (!membershipListenerRegistered) {
+ ClientMembership.registerClientMembershipListener(listener);
+ }
+
+ if (!isGatewayReceiver) {
+ InternalDistributedSystem system = ((GemFireCacheImpl) this.cache)
+ .getDistributedSystem();
+ system.handleResourceEvent(ResourceEvent.CACHE_SERVER_START, this);
+ }
+
+ }
+
+
+ /**
+ * Gets the address that this bridge server can be contacted on from external
+ * processes.
+ * @since 5.7
+ */
+ public String getExternalAddress() {
+ return getExternalAddress(true);
+ }
+
+ public String getExternalAddress(boolean checkServerRunning) {
+ if (checkServerRunning) {
+ if (!this.isRunning()) {
+ String s = "A bridge server's bind address is only available if it has been started";
+ this.cache.getCancelCriterion().checkCancelInProgress(null);
+ throw new IllegalStateException(s);
+ }
+ }
+ if (this.hostnameForClients == null || this.hostnameForClients.equals("")) {
+ if (this.acceptor != null) {
+ return this.acceptor.getExternalAddress();
+ }
+ else {
+ return null;
+ }
+ }
+ else {
+ return this.hostnameForClients;
+ }
+ }
+
+ public boolean isRunning() {
+ return this.acceptor != null && this.acceptor.isRunning();
+ }
+
+ public synchronized void stop() {
+ if (!isRunning()) {
+ return;
+ }
+
+ RuntimeException firstException = null;
+
+ try {
+ if(this.loadMonitor != null) {
+ this.loadMonitor.stop();
+ }
+ } catch(RuntimeException e) {
+ cache.getLoggerI18n().warning(LocalizedStrings.CacheServerImpl_CACHESERVER_ERROR_CLOSING_LOAD_MONITOR, e);
+ firstException = e;
+ }
+
+ try {
+ if (this.advisor != null) {
+ this.advisor.close();
+ }
+ } catch(RuntimeException e) {
+ cache.getLoggerI18n().warning(LocalizedStrings.CacheServerImpl_CACHESERVER_ERROR_CLOSING_ADVISOR, e);
+ firstException = e;
+ }
+
+ try {
+ if (this.acceptor != null) {
+ this.acceptor.close();
+ }
+ } catch(RuntimeException e) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.CacheServerImpl_CACHESERVER_ERROR_CLOSING_ACCEPTOR_MONITOR), e);
+ if (firstException != null) {
+ firstException = e;
+ }
+ }
+
+ if(firstException != null) {
+ throw firstException;
+ }
+
+ //TODO : We need to clean up the admin region created for client
+ //monitoring.
+
+ // BridgeServer is still available, just not running, so we don't take
+ // it out of the cache's list...
+ // cache.removeBridgeServer(this);
+
+ /* Assuming start won't be called after stop */
+ ClientMembership.unregisterClientMembershipListener(listener);
+
+ TXManagerImpl txMgr = (TXManagerImpl) cache.getCacheTransactionManager();
+ txMgr.removeHostedTXStatesForClients();
+
+ if (!isGatewayReceiver) {
+ InternalDistributedSystem system = ((GemFireCacheImpl) this.cache)
+ .getDistributedSystem();
+ system.handleResourceEvent(ResourceEvent.CACHE_SERVER_STOP, this);
+ }
+
+ }
+
+ private String getConfig() {
+ ClientSubscriptionConfig csc = this.getClientSubscriptionConfig();
+ String str =
+ "port=" + getPort() + " max-connections=" + getMaxConnections()
+ + " max-threads=" + getMaxThreads() + " notify-by-subscription="
+ + getNotifyBySubscription() + " socket-buffer-size="
+ + getSocketBufferSize() + " maximum-time-between-pings="
+ + getMaximumTimeBetweenPings() + " maximum-message-count="
+ + getMaximumMessageCount() + " message-time-to-live="
+ + getMessageTimeToLive() + " eviction-policy=" + csc.getEvictionPolicy()
+ + " capacity=" + csc.getCapacity() + " overflow directory=";
+ if (csc.getDiskStoreName() != null) {
+ str += csc.getDiskStoreName();
+ } else {
+ str += csc.getOverflowDirectory();
+ }
+ str +=
+ " groups=" + Arrays.asList(getGroups())
+ + " loadProbe=" + loadProbe
+ + " loadPollInterval=" + loadPollInterval
+ + " tcpNoDelay=" + tcpNoDelay;
+ return str;
+ }
+
+ @Override
+ public String toString() {
+ ClientSubscriptionConfig csc = this.getClientSubscriptionConfig();
+ String str =
+ "CacheServer on port=" + getPort() + " client subscription config policy="
+ + csc.getEvictionPolicy() + " client subscription config capacity="
+ + csc.getCapacity();
+ if (csc.getDiskStoreName() != null) {
+ str += " client subscription config overflow disk store="
+ + csc.getDiskStoreName();
+ } else {
+ str += " client subscription config overflow directory="
+ + csc.getOverflowDirectory();
+ }
+ return str;
+ }
+
+ /**
+ * Test method used to access the internal acceptor
+ *
+ * @return the internal acceptor
+ */
+ public AcceptorImpl getAcceptor() {
+ return this.acceptor;
+ }
+
+ // DistributionAdvisee methods
+
+ public DM getDistributionManager() {
+ return getSystem().getDistributionManager();
+ }
+
+ public ClientSession getClientSession(String durableClientId) {
+ return getCacheClientNotifier().getClientProxy(durableClientId);
+ }
+
+ public ClientSession getClientSession(DistributedMember member) {
+ return getCacheClientNotifier().getClientProxy(
+ ClientProxyMembershipID.getClientId(member));
+ }
+
+ public Set getAllClientSessions() {
+ return new HashSet(getCacheClientNotifier().getClientProxies());
+ }
+
+ /**
+ * create client subscription
+ *
+ * @param cache
+ * @param ePolicy
+ * @param capacity
+ * @param port
+ * @param overFlowDir
+ * @param isDiskStore
+ * @return client subscription name
+ * @since 5.7
+ */
+ public static String clientMessagesRegion(GemFireCacheImpl cache, String ePolicy,
+ int capacity, int port, String overFlowDir, boolean isDiskStore) {
+ AttributesFactory factory = getAttribFactoryForClientMessagesRegion(cache,
+ ePolicy, capacity, overFlowDir, isDiskStore);
+ RegionAttributes attr = factory.create();
+
+ return createClientMessagesRegion(attr, cache, capacity, port);
+ }
+
+ public static AttributesFactory getAttribFactoryForClientMessagesRegion(
+ GemFireCacheImpl cache,
+ String ePolicy, int capacity, String overflowDir, boolean isDiskStore)
+ throws InvalidValueException, GemFireIOException {
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+
+ if (isDiskStore) {
+ // overflowDir parameter is actually diskstore name
+ factory.setDiskStoreName(overflowDir);
+ // client subscription queue is always overflow to disk, so do async
+ // see feature request #41479
+ factory.setDiskSynchronous(true);
+ } else if (overflowDir == null || overflowDir.equals(ClientSubscriptionConfig.DEFAULT_OVERFLOW_DIRECTORY)) {
+ factory.setDiskStoreName(null);
+ // client subscription queue is always overflow to disk, so do async
+ // see feature request #41479
+ factory.setDiskSynchronous(true);
+ } else {
+ File dir = new File(overflowDir + File.separatorChar
+ + generateNameForClientMsgsRegion(OSProcess.getId()));
+ // This will delete the overflow directory when virtual machine terminates.
+ dir.deleteOnExit();
+ if (!dir.mkdirs() && !dir.isDirectory()) {
+ throw new GemFireIOException("Could not create client subscription overflow directory: "
+ + dir.getAbsolutePath());
+ }
+ File[] dirs = { dir };
+ DiskStoreFactory dsf = cache.createDiskStoreFactory();
+ DiskStore bsi = dsf.setAutoCompact(true)
+ .setDiskDirsAndSizes(dirs, new int[] { Integer.MAX_VALUE })
+ .create("bsi");
+ factory.setDiskStoreName("bsi");
+ // backward compatibility, it was sync
+ factory.setDiskSynchronous(true);
+ }
+ factory.setDataPolicy(DataPolicy.NORMAL);
+ // enable statistics
+ factory.setStatisticsEnabled(true);
+ /* setting LIFO related eviction attributes */
+ if (HARegionQueue.HA_EVICTION_POLICY_ENTRY.equals(ePolicy)) {
+ factory
+ .setEvictionAttributes(EvictionAttributesImpl.createLIFOEntryAttributes(
+ capacity, EvictionAction.OVERFLOW_TO_DISK));
+ }
+ else if (HARegionQueue.HA_EVICTION_POLICY_MEMORY.equals(ePolicy)) { // condition refinement
+ factory
+ .setEvictionAttributes(EvictionAttributesImpl.createLIFOMemoryAttributes(
+ capacity, EvictionAction.OVERFLOW_TO_DISK));
+ }
+ else {
+ // throw invalid eviction policy exception
+ throw new InvalidValueException(
+ LocalizedStrings.CacheServerImpl__0_INVALID_EVICTION_POLICY.toLocalizedString(ePolicy));
+ }
+ return factory;
+ }
+
+ public static String createClientMessagesRegion(RegionAttributes attr,
+ GemFireCacheImpl cache, int capacity, int port) {
+ // generating unique name in VM for ClientMessagesRegion
+ String regionName = generateNameForClientMsgsRegion(port);
+ try {
+ cache.createVMRegion(regionName, attr,
+ new InternalRegionArguments().setDestroyLockFlag(true)
+ .setRecreateFlag(false).setSnapshotInputStream(null)
+ .setImageTarget(null).setIsUsedForMetaRegion(true));
+ }
+ catch (RegionExistsException ree) {
+ InternalGemFireError assErr = new InternalGemFireError(
+ "unexpected exception");
+ assErr.initCause(ree);
+ throw assErr;
+ }
+ catch (IOException e) {
+ // only if loading snapshot, not here
+ InternalGemFireError assErr = new InternalGemFireError(
+ "unexpected exception");
+ assErr.initCause(e);
+ throw assErr;
+ }
+ catch (ClassNotFoundException e) {
+ // only if loading snapshot, not here
+ InternalGemFireError assErr = new InternalGemFireError(
+ "unexpected exception");
+ assErr.initCause(e);
+ throw assErr;
+ }
+ return regionName;
+ }
+
+ public static String createClientMessagesRegionForTesting(GemFireCacheImpl cache,
+ String ePolicy, int capacity, int port, int expiryTime, String overFlowDir, boolean isDiskStore) {
+ AttributesFactory factory = getAttribFactoryForClientMessagesRegion(cache,
+ ePolicy, capacity, overFlowDir, isDiskStore);
+ ExpirationAttributes ea = new ExpirationAttributes(expiryTime,
+ ExpirationAction.LOCAL_INVALIDATE);
+ factory.setEntryTimeToLive(ea);
+ RegionAttributes attr = factory.create();
+
+ return createClientMessagesRegion(attr, cache, capacity, port);
+ }
+
+ /**
+ * Generates the name for the client subscription using the given id.
+ *
+ * @param id
+ * @return String
+ * @since 5.7
+ */
+ public static String generateNameForClientMsgsRegion(int id) {
+ return ClientSubscriptionConfigImpl.CLIENT_SUBSCRIPTION + "_" + id;
+ }
+
+ /*
+ * Marker class name to identify the lock more easily in thread dumps private
+ * static class ClientMessagesRegionLock extends Object { }
+ */
+ public DistributionAdvisor getDistributionAdvisor() {
+ return this.advisor;
+ }
+
+ /**
+ * Returns the BridgeServerAdvisor for this server
+ */
+ public CacheServerAdvisor getCacheServerAdvisor() {
+ return this.advisor;
+ }
+
+ public Profile getProfile() {
+ return getDistributionAdvisor().createProfile();
+ }
+
+ public DistributionAdvisee getParentAdvisee() {
+ return null;
+ }
+
+ /**
+ * Returns the underlying <code>InternalDistributedSystem</code> connection.
+ * @return the underlying <code>InternalDistributedSystem</code>
+ */
+ public InternalDistributedSystem getSystem() {
+ return (InternalDistributedSystem)this.cache.getDistributedSystem();
+ }
+
+ public String getName() {
+ return "CacheServer";
+ }
+
+ public String getFullPath() {
+ return getName();
+ }
+
+ private final static AtomicInteger profileSN = new AtomicInteger();
+
+ private static int createSerialNumber() {
+ return profileSN.incrementAndGet();
+ }
+
+ /**
+ * Returns an array of all the groups of this bridge server.
+ * This includes those from the groups gemfire property
+ * and those explicitly added to this server.
+ */
+ public String[] getCombinedGroups() {
+ ArrayList<String> groupList = new ArrayList<String>();
+ for (String g: MemberAttributes.parseGroups(null, getSystem().getConfig().getGroups())) {
+ if (!groupList.contains(g)) {
+ groupList.add(g);
+ }
+ }
+ for (String g: getGroups()) {
+ if (!groupList.contains(g)) {
+ groupList.add(g);
+ }
+ }
+ String[] groups = new String[groupList.size()];
+ return groupList.toArray(groups);
+ }
+
+ public /*synchronized causes deadlock*/ void fillInProfile(Profile profile) {
+ assert profile instanceof CacheServerProfile;
+ CacheServerProfile bp = (CacheServerProfile)profile;
+ bp.setHost(getExternalAddress(false));
+ bp.setPort(getPort());
+ bp.setGroups(getCombinedGroups());
+ bp.setMaxConnections(maxConnections);
+ bp.setInitialLoad(loadMonitor.getLastLoad());
+ bp.setLoadPollInterval(getLoadPollInterval());
+ bp.serialNumber = getSerialNumber();
+ bp.finishInit();
+ }
+
+ public int getSerialNumber() {
+ return this.serialNumber;
+ }
+
+
+ protected CacheClientNotifier getCacheClientNotifier() {
+ return getAcceptor().getCacheClientNotifier();
+ }
+
+ /**
+ * Registers a new <code>InterestRegistrationListener</code> with the set of
+ * <code>InterestRegistrationListener</code>s.
+ *
+ * @param listener
+ * The <code>InterestRegistrationListener</code> to register
+ * @throws IllegalStateException if the BridgeServer has not been started
+ * @since 5.8Beta
+ */
+ public void registerInterestRegistrationListener(
+ InterestRegistrationListener listener) {
+ if (!this.isRunning()) {
+ throw new IllegalStateException(LocalizedStrings.CacheServerImpl_MUST_BE_RUNNING.toLocalizedString());
+ }
+ getCacheClientNotifier().registerInterestRegistrationListener(listener);
+ }
+
+ /**
+ * Unregisters an existing <code>InterestRegistrationListener</code> from
+ * the set of <code>InterestRegistrationListener</code>s.
+ *
+ * @param listener
+ * The <code>InterestRegistrationListener</code> to
+ * unregister
+ *
+ * @since 5.8Beta
+ */
+ public void unregisterInterestRegistrationListener(
+ InterestRegistrationListener listener) {
+ getCacheClientNotifier().unregisterInterestRegistrationListener(listener);
+ }
+
+ /**
+ * Returns a read-only set of <code>InterestRegistrationListener</code>s
+ * registered with this notifier.
+ *
+ * @return a read-only set of <code>InterestRegistrationListener</code>s
+ * registered with this notifier
+ *
+ * @since 5.8Beta
+ */
+ public Set getInterestRegistrationListeners() {
+ return getCacheClientNotifier().getInterestRegistrationListeners();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerLauncher.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerLauncher.java
index ac198e9,0000000..c5de84b
mode 100755,000000..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerLauncher.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerLauncher.java
@@@ -1,1391 -1,0 +1,1393 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.internal.cache;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.PrintStream;
+import java.io.Serializable;
+import java.net.URL;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+import com.gemstone.gemfire.internal.OSProcess;
+import com.gemstone.gemfire.internal.PureJavaMode;
+import com.gemstone.gemfire.internal.SocketCreator;
++import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerHelper;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
- import com.gemstone.gemfire.internal.logging.PureLogWriter;
+import com.gemstone.gemfire.internal.process.StartupStatus;
+import com.gemstone.gemfire.internal.process.StartupStatusListener;
+import com.gemstone.gemfire.internal.util.IOUtils;
+import com.gemstone.gemfire.internal.util.JavaCommandBuilder;
+
+/**
+ * Launcher program to start a cache server.
+ *
+ * @author Sudhir Menon
+ * @author Barry Oglesby
+ * @author David Whitlock
+ * @author John Blum
+ *
+ * @since 2.0.2
+ */
+public class CacheServerLauncher {
+
+ /** Is this VM a dedicated Cache Server? This value is used mainly by the admin API. */
+ public static boolean isDedicatedCacheServer = Boolean.getBoolean("gemfire.isDedicatedServer");
+
+ public static boolean ASSIGN_BUCKETS = Boolean.getBoolean("gemfire.CacheServerLauncher.assignBucketsToPartitions");
+
+ //default is to exit if property not defined
+ public static boolean DONT_EXIT_AFTER_LAUNCH = Boolean.getBoolean("gemfire.CacheServerLauncher.dontExitAfterLaunch");
+
+ /** Should the launch command be printed? */
+ public static final boolean PRINT_LAUNCH_COMMAND = Boolean.getBoolean(
+ CacheServerLauncher.class.getSimpleName() + ".PRINT_LAUNCH_COMMAND");
+
+ private static final long STATUS_WAIT_TIME
+ = Long.getLong("gemfire.CacheServerLauncher.STATUS_WAIT_TIME_MS", 15000);
+
+ /** How long to wait for a cache server to stop */
+ private static final long SHUTDOWN_WAIT_TIME
+ = Long.getLong("gemfire.CacheServerLauncher.SHUTDOWN_WAIT_TIME_MS", 20000);
+
+ protected final String baseName;
+ protected final String defaultLogFileName;
+ protected final String startLogFileName;
+ protected final String statusName;
+ protected Status status = null;
+ protected File workingDir = null;
+ protected PrintStream oldOut = System.out;
+ protected PrintStream oldErr = System.err;
+ protected LogWriterI18n logger = null;
+ protected String maxHeapSize;
+ protected String initialHeapSize;
+ protected String offHeapSize;
+
+
+ public static final int SHUTDOWN = 0;
+ public static final int STARTING = 1;
+ public static final int RUNNING = 2;
+ public static final int SHUTDOWN_PENDING = 3;
+
+ private static final int FORCE_STATUS_FILE_READ_ITERATION_COUNT = 10;
+
+ public CacheServerLauncher(final String baseName) {
+ assert baseName != null : "The base name used for the cache server launcher files cannot be null!";
+ this.baseName = baseName;
+ final String baseNameLowerCase = baseName.toLowerCase().replace(" ", "");
+ this.startLogFileName = "start_" + baseNameLowerCase + ".log";
+ this.defaultLogFileName = baseNameLowerCase + ".log";
+ this.statusName = "." + baseNameLowerCase + ".ser";
+ }
+
+ protected static Status createStatus(final String baseName, final int state, final int pid) {
+ return createStatus(baseName, state, pid, null, null);
+ }
+
+ protected static Status createStatus(final String baseName, final int state, final int pid, final String msg, final Throwable t) {
+ final Status status = new Status(baseName);
+ status.state = state;
+ status.pid = pid;
+ status.msg = msg;
+ status.exception = t;
+ return status;
+ }
+
+ /**
+ * Prints usage information about this program.
+ */
+ protected void usage() {
+ PrintStream out = System.out;
+ out.println("cacheserver start [-J<vmarg>]* [<attName>=<attValue>]* [-dir=<workingdir>] [-classpath=<classpath>] [-disable-default-server] [-rebalance] [-lock-memory] [-server-port=<server-port>] [-server-bind-address=<server-bind-address>] [-critical-heap-percentage=<critical-heap-percentage>] [-eviction-heap-percentage=<eviction-heap-percentage>] [-critical-off-heap-percentage=<critical-off-heap-percentage>] [-eviction-off-heap-percentage=<eviction-off-heap-percentage>]\n" );
+ out.println("\t" + LocalizedStrings.CacheServerLauncher_STARTS_A_GEMFIRE_CACHESERVER_VM.toLocalizedString() );
+ out.println("\t" + LocalizedStrings.CacheServerLauncher_VMARG.toLocalizedString());
+ out.println("\t" + LocalizedStrings.CacheServerLauncher_DIR.toLocalizedString());
+ out.println("\t" + LocalizedStrings.CacheServerLauncher_CLASSPATH.toLocalizedString());
+ out.println("\t" + LocalizedStrings.CacheServerLauncher_ATTNAME.toLocalizedString());
+ out.println("\t" + LocalizedStrings.CacheServerLauncher_REBALANCE.toLocalizedString());
+ out.println("\t" + LocalizedStrings.CacheServerLauncher_DISABLE_DEFAULT_SERVER.toLocalizedString());
+ out.println("\t" + LocalizedStrings.CacheServerLauncher_SERVER_PORT.toLocalizedString());
+ out.println("\t" + LocalizedStrings.CacheServerLauncher_SERVER_BIND_ADDRESS.toLocalizedString());
+ out.println("\t" + LocalizedStrings.CacheServerLauncher_CRITICAL_HEAP_PERCENTAGE.toLocalizedString());
+ out.println("\t" + LocalizedStrings.CacheServerLauncher_EVICTION_HEAP_PERCENTAGE.toLocalizedString());
+ out.println("\t" + LocalizedStrings.CacheServerLauncher_CRITICAL_OFF_HEAP_PERCENTAGE.toLocalizedString());
+ out.println("\t" + LocalizedStrings.CacheServerLauncher_EVICTION_OFF_HEAP_PERCENTAGE.toLocalizedString());
+ out.println("\t" + LocalizedStrings.CacheServerLauncher_LOCK_MEMORY.toLocalizedString());
+
+ out.println();
+ out.println( "cacheserver stop [-dir=<workingdir>]" );
+ out.println("\t" + LocalizedStrings.CacheServerLauncher_STOPS_A_GEMFIRE_CACHESERVER_VM.toLocalizedString());
+ out.println("\t" + LocalizedStrings.CacheServerLauncher_DIR.toLocalizedString());
+ out.println();
+ out.println( "cacheserver status [-dir=<workingdir>]" );
+ out.println( "\t" + LocalizedStrings.CacheServerLauncher_STATUS.toLocalizedString());
+ out.println("\t" + LocalizedStrings.CacheServerLauncher_DIR.toLocalizedString());
+ }
+
+ /**
+ * Prints the status of the cache server running the configured
+ * working directory.
+ */
+ protected void status(final String[] args) throws Exception {
+ workingDir = (File) getStopOptions(args).get(DIR);
+ System.out.println(getStatus());
+ System.exit(0);
+ }
+
+ /**
+ * Returns the <code>Status</code> of the cache server in the
+ * <code>workingDir</code>.
+ */
+ protected Status getStatus() throws Exception {
+ Status status;
+
+ if (new File(workingDir, statusName).exists()) {
+ status = spinReadStatus(); // See bug 32456
+ }
+ else {
+ // no pid since the cache server is not running
+ status = createStatus(this.baseName, SHUTDOWN, 0);
+ }
+
+ return status;
+ }
+
+ /**
+ * Main method that parses the command line and performs an
+ * will start, stop, or get the status of a cache server. This main
+ * method is also the main method of the launched cacher server VM
+ * ("server" mode).
+ */
+ public static void main(final String[] args) {
+ final CacheServerLauncher launcher = new CacheServerLauncher("CacheServer");
+ boolean inServer = false;
+
+ try {
+ if (args.length > 0) {
+ if (args[0].equalsIgnoreCase("start")) {
+ launcher.start(args);
+ }
+ else if (args[0].equalsIgnoreCase("server")) {
+ inServer = true;
+ launcher.server(args);
+ }
+ else if (args[0].equalsIgnoreCase("stop")) {
+ launcher.stop(args);
+ }
+ else if (args[0].equalsIgnoreCase("status")) {
+ launcher.status(args);
+ }
+ else {
+ launcher.usage();
+ System.exit(1);
+ }
+ }
+ else {
+ launcher.usage();
+ System.exit(1);
+ }
+
+ throw new Exception(LocalizedStrings.CacheServerLauncher_INTERNAL_ERROR_SHOULDNT_REACH_HERE.toLocalizedString());
+ }
+ catch (VirtualMachineError err) {
+ SystemFailure.initiateFailure(err);
+ // If this ever returns, rethrow the error. We're poisoned
+ // now, so don't let this thread continue.
+ throw err;
+ }
+ catch (Throwable t) {
+ // Whenever you catch Error or Throwable, you must also
+ // catch VirtualMachineError (see above). However, there is
+ // _still_ a possibility that you are dealing with a cascading
+ // error condition, so you also need to check to see if the JVM
+ // is still usable:
+ SystemFailure.checkFailure();
+ t.printStackTrace();
+ if (inServer) {
+ launcher.setServerError(LocalizedStrings.CacheServerLauncher_ERROR_STARTING_SERVER_PROCESS
+ .toLocalizedString(), t);
+ }
+ launcher.restoreStdOut();
+ if (launcher.logger != null) {
+ launcher.logger.severe(LocalizedStrings.CacheServerLauncher_CACHE_SERVER_ERROR, t);
+
+ }
+ else {
+ System.out.println(LocalizedStrings.CacheServerLauncher_ERROR_0.toLocalizedString(t.getMessage()));
+ }
+ System.exit(1);
+ }
+ }
+
+ protected void restoreStdOut( ) {
+ System.setErr( oldErr );
+ System.setOut( oldOut );
+ }
+
+ protected static final String DIR = "dir";
+ protected static final String VMARGS = "vmargs";
+ protected static final String PROPERTIES = "properties";
+ protected static final String CLASSPATH = "classpath";
+ protected static final String REBALANCE = "rebalance";
+ protected static final String SERVER_PORT = "server-port";
+ protected static final String SERVER_BIND_ADDRESS = "server-bind-address";
+ protected static final String DISABLE_DEFAULT_SERVER = "disable-default-server";
+ public static final String CRITICAL_HEAP_PERCENTAGE =
+ "critical-heap-percentage";
+ public static final String EVICTION_HEAP_PERCENTAGE =
+ "eviction-heap-percentage";
+ public static final String CRITICAL_OFF_HEAP_PERCENTAGE =
+ "critical-off-heap-percentage";
+ public static final String EVICTION_OFF_HEAP_PERCENTAGE =
+ "eviction-off-heap-percentage";
+ protected static final String LOCK_MEMORY = "lock-memory";
+
+ protected final File processDirOption(final Map<String, Object> options, final String dirValue) throws FileNotFoundException {
+ final File inputWorkingDirectory = new File(dirValue);
+
+ if (!inputWorkingDirectory.exists()) {
+ throw new FileNotFoundException(LocalizedStrings.CacheServerLauncher_THE_INPUT_WORKING_DIRECTORY_DOES_NOT_EXIST_0
+ .toLocalizedString(dirValue));
+ }
+
+ options.put(DIR, inputWorkingDirectory);
+
+ return inputWorkingDirectory;
+ }
+
+ /**
+ * Populates a map that maps the name of the start options such as {@link #DIR} to its value on the command line.
+ * If no value is specified on the command line, a default one is provided.
+ */
+ protected Map<String, Object> getStartOptions(String[] args) throws Exception {
+ final Map<String, Object> options = new HashMap<String, Object>();
+ options.put(DIR, new File(System.getProperty("user.dir")));
+
+ final List<String> vmArgs = new ArrayList<String>();
+ options.put(VMARGS, vmArgs);
+
+ final Properties props = new Properties();
+ options.put(PROPERTIES, props);
+
+ for (final String arg : args) {
+ if (arg.equals("start")) {
+ // expected
+ }
+ else if (arg.startsWith("-classpath=")) {
+ options.put(CLASSPATH, arg.substring(arg.indexOf("=") + 1));
+ }
+ else if (arg.startsWith("-dir=")) {
+ processDirOption(options, arg.substring(arg.indexOf("=") + 1));
+ }
+ else if (arg.startsWith("-disable-default-server")) {
+ options.put(DISABLE_DEFAULT_SERVER, arg);
+ }
+ else if (arg.startsWith("-lock-memory")) {
+ if (System.getProperty("os.name").indexOf("Windows") >= 0) {
+ throw new IllegalArgumentException("Unable to lock memory on this operating system");
+ }
+ props.put(DistributionConfig.LOCK_MEMORY_NAME, "true");
+ }
+ else if (arg.startsWith("-rebalance")) {
+ options.put(REBALANCE, Boolean.TRUE);
+ }
+ else if (arg.startsWith("-server-port")) {
+ options.put(SERVER_PORT, arg);
+ }
+ else if (arg.startsWith("-" + CRITICAL_HEAP_PERCENTAGE) ) {
+ options.put(CRITICAL_HEAP_PERCENTAGE, arg);
+ }
+ else if (arg.startsWith("-" + EVICTION_HEAP_PERCENTAGE) ) {
+ options.put(EVICTION_HEAP_PERCENTAGE, arg);
+ }
+ else if (arg.startsWith("-" + CRITICAL_OFF_HEAP_PERCENTAGE) ) {
+ options.put(CRITICAL_OFF_HEAP_PERCENTAGE, arg);
+ }
+ else if (arg.startsWith("-" + EVICTION_OFF_HEAP_PERCENTAGE) ) {
+ options.put(EVICTION_OFF_HEAP_PERCENTAGE, arg);
+ }
+ else if (arg.startsWith("-server-bind-address")) {
+ options.put(SERVER_BIND_ADDRESS, arg);
+ }
+ else if (arg.startsWith("-J")) {
+ String vmArg = arg.substring(2);
+ if (vmArg.startsWith("-Xmx")) {
+ this.maxHeapSize = vmArg.substring(4);
+ } else if (vmArg.startsWith("-Xms")) {
+ this.initialHeapSize = vmArg.substring(4);
+ }
+ vmArgs.add(vmArg);
+ }
+ // moved this default block down so that "-J" like options can have '=' in them.
+ // an 'indexOf' the assignment operator with greater than 0 ensures a non-empty String key value
+ else if (arg.indexOf("=") > 0) {
+ final int assignmentIndex = arg.indexOf("=");
+ final String key = arg.substring(0, assignmentIndex);
+ final String value = arg.substring(assignmentIndex + 1);
+
+ if (key.startsWith("-")) {
+ processStartOption(key.substring(1), value, options, vmArgs, props);
+ }
+ else {
+ processStartArg(key, value, options, vmArgs, props);
+ }
+ }
+ else {
+ throw new IllegalArgumentException(LocalizedStrings.CacheServerLauncher_UNKNOWN_ARGUMENT_0
+ .toLocalizedString(arg));
+ }
+ }
+
+ // -J-Djava.awt.headless=true has been added for Mac platform where it
+ // causes an icon to appear for sqlf launched procs
+ // TODO: check which library/GemFire code causes awt to be touched
+ vmArgs.add("-Djava.awt.headless=true");
+
+ // configure commons-logging to use JDK logging
+ vmArgs.add("-Dorg.apache.commons.logging.Log=org.apache.commons.logging.impl.Jdk14Logger");
+
+ options.put(VMARGS, vmArgs);
+ return options;
+ }
+
+ /**
+ * Process a command-line options of the form "key=value".
+ */
+ protected void processStartArg(final String key,
+ final String value,
+ final Map<String, Object> options,
+ final List<String> vmArgs,
+ final Properties props)
+ throws Exception
+ {
+ props.setProperty(key, value);
+ }
+
+ /**
+ * Process a command-line option of the form "-key=value".
+ */
+ protected void processStartOption(final String key,
+ final String value,
+ final Map<String, Object> options,
+ final List<String> vmArgs,
+ final Properties props)
+ throws Exception
+ {
+ processUnknownStartOption(key, value, options, vmArgs, props);
+ }
+
+ /**
+ * Process a command-line option of the form "-key=value" unknown to the base class.
+ */
+ protected void processUnknownStartOption(final String key,
+ final String value,
+ final Map<String, Object> options,
+ final List<String> vmArgs,
+ final Properties props) {
+ throw new IllegalArgumentException(LocalizedStrings.CacheServerLauncher_UNKNOWN_ARGUMENT_0.toLocalizedString(key));
+ }
+
+ /**
+ * Extracts configuration information used when launching the cache server VM.
+ */
+ protected Map<String, Object> getServerOptions(final String[] args) throws Exception {
+ final Map<String, Object> options = new HashMap<String, Object>();
+ options.put(DIR, new File("."));
+ workingDir = (File) options.get(DIR);
+
+ final Properties props = new Properties();
+ options.put(PROPERTIES, props);
+
+ for (final String arg : args) {
+ if (arg.equals("server")) {
+ // expected
+ }
+ else if (arg.startsWith("-dir=")) {
+ this.workingDir = processDirOption(options, arg.substring(arg.indexOf("=") + 1));
+ }
+ else if (arg.startsWith("-rebalance")) {
+ options.put(REBALANCE, Boolean.TRUE);
+ }
+ else if (arg.startsWith("-disable-default-server")) {
+ options.put(DISABLE_DEFAULT_SERVER, Boolean.TRUE);
+ }
+ else if (arg.startsWith("-lock-memory")) {
+ props.put(DistributionConfig.LOCK_MEMORY_NAME, "true");
+ }
+ else if (arg.startsWith("-server-port")) {
+ options.put(SERVER_PORT, arg.substring(arg.indexOf("=") + 1));
+ }
+ else if (arg.startsWith("-server-bind-address")) {
+ options.put(SERVER_BIND_ADDRESS, arg.substring(arg.indexOf("=") + 1));
+ }
+ else if (arg.startsWith("-" + CRITICAL_HEAP_PERCENTAGE)) {
+ options.put(CRITICAL_HEAP_PERCENTAGE, arg.substring(arg.indexOf("=") + 1));
+ }
+ else if (arg.startsWith("-" + EVICTION_HEAP_PERCENTAGE)) {
+ options.put(EVICTION_HEAP_PERCENTAGE, arg.substring(arg.indexOf("=") + 1));
+ }
+ else if (arg.startsWith("-" + CRITICAL_OFF_HEAP_PERCENTAGE)) {
+ options.put(CRITICAL_OFF_HEAP_PERCENTAGE, arg.substring(arg.indexOf("=") + 1));
+ }
+ else if (arg.startsWith("-" + EVICTION_OFF_HEAP_PERCENTAGE)) {
+ options.put(EVICTION_OFF_HEAP_PERCENTAGE, arg.substring(arg.indexOf("=") + 1));
+ }
+ else if (arg.indexOf("=") > 1) {
+ final int assignmentIndex = arg.indexOf("=");
+ final String key = arg.substring(0, assignmentIndex);
+ final String value = arg.substring(assignmentIndex + 1);
+
+ if (key.startsWith("-")) {
+ options.put(key.substring(1), value);
+ }
+ else {
+ props.setProperty(key, value);
+ }
+ }
+ else {
+ throw new IllegalArgumentException(LocalizedStrings.CacheServerLauncher_UNKNOWN_ARGUMENT_0.toLocalizedString(arg));
+ }
+ }
+
+ return options;
+ }
+
+ /**
+ * Extracts configuration information for stopping a cache server based on the contents of the command-line.
+ * This method can also be used with getting the status of a cache server.
+ */
+ protected Map<String, Object> getStopOptions(final String[] args) throws Exception {
+ final Map<String, Object> options = new HashMap<String, Object>();
+ options.put(DIR, new File("."));
+
+ for (final String arg : args) {
+ if (arg.equals("stop") || arg.equals("status")) {
+ // expected
+ }
+ else if (arg.startsWith("-dir=")) {
+ processDirOption(options, arg.substring(arg.indexOf("=") + 1));
+ }
+ else {
+ throw new IllegalArgumentException(LocalizedStrings.CacheServerLauncher_UNKNOWN_ARGUMENT_0
+ .toLocalizedString(arg));
+ }
+ }
+
+ return options;
+ }
+
+ /**
+ * Configures and spawns a VM that hosts a cache server. Any output
+ * generated while starting the VM will be placed in a special
+ * {@linkplain #startLogFileName log file}.
+ *
+ * See #getStartOptions
+ * @see OSProcess#bgexec(String[], File, File, boolean, Map)
+ */
+ public void start(final String[] args) throws Exception {
+ final Map<String, Object> options = getStartOptions(args);
+
+ workingDir = (File) options.get(DIR);
+
+ // Complain if a cache server is already running in the specified working directory.
+ // See bug 32574.
+ verifyAndClearStatus();
+
+ // start the GemFire Cache Server proces...
+ runCommandLine(options, buildCommandLine(options));
+
+ // wait for status.state == RUNNING
+ waitForRunning();
+
+ if (DONT_EXIT_AFTER_LAUNCH) {
+ return;
+ }
+
+ System.exit(0);
+ }
+
+ private void verifyAndClearStatus() throws Exception {
+ final Status status = getStatus();
+
+ if (status != null && status.state != SHUTDOWN) {
+ throw new IllegalStateException(LocalizedStrings.CacheServerLauncher_A_0_IS_ALREADY_RUNNING_IN_DIRECTORY_1_2
+ .toLocalizedString(this.baseName, workingDir, status));
+ }
+
+ deleteStatus();
+ }
+
+ private String[] buildCommandLine(final Map<String, Object> options) {
+ final List<String> commandLine = JavaCommandBuilder.buildCommand(this.getClass().getName(),
+ (String) options.get(CLASSPATH), null, (List<String>) options.get(VMARGS));
+
+ commandLine.add("server");
+ addToServerCommand(commandLine, options);
+
+ return commandLine.toArray(new String[commandLine.size()]);
+ }
+
+ private void printCommandLine(final String[] commandLine) {
+ if (PRINT_LAUNCH_COMMAND) {
+ System.out.println("Starting " + this.baseName + " with command:");
+ for (final String command : commandLine) {
+ System.out.print(command);
+ System.out.print(" ");
+ }
+ System.out.println();
+ }
+ }
+
+ private int runCommandLine(final Map<String, Object> options, final String[] commandLine) throws Exception {
+ final File startLogFile = new File(workingDir, startLogFileName).getAbsoluteFile(); // see bug 32548
+
+ if (startLogFile.exists() && !startLogFile.delete()) {
+ throw new IOException("Unable to delete start log file (" + startLogFile.getAbsolutePath() + ")!");
+ }
+
+ Map<String, String> env = new HashMap<String, String>();
+ // read the passwords from command line
+ SocketCreator.readSSLProperties(env);
+
+ printCommandLine(commandLine);
+
+ final int pid = OSProcess.bgexec(commandLine, workingDir, startLogFile, false, env);
+
+ printStartMessage(options, pid);
+
+ return pid;
+ }
+
+ protected void printStartMessage(final Map<String, Object> options, final int pid) throws Exception {
+ System.out.println(LocalizedStrings.CacheServerLauncher_STARTING_0_WITH_PID_1.toLocalizedString(this.baseName, pid));
+ }
+
+ /**
+ * Sets the status of the cache server to be {@link #RUNNING}.
+ */
+ public void running() {
+ try {
+ writeStatus(createStatus(this.baseName, RUNNING, OSProcess.getId()));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static ThreadLocal<Integer> serverPort = new ThreadLocal<Integer>();
+
+ public static ThreadLocal<String> serverBindAddress = new ThreadLocal<String>();
+
+ public static Integer getServerPort() {
+ return serverPort.get();
+ }
+
+ public static String getServerBindAddress() {
+ return serverBindAddress.get();
+ }
+
+ public static ThreadLocal<Boolean> disableDefaultServer = new ThreadLocal<Boolean>();
+
+ public static Boolean getDisableDefaultServer() {
+ return disableDefaultServer.get();
+ }
+
+ /**
+ * The method that does the work of being a cache server. It is
+ * invoked in the VM spawned by the {@link #start} method.
+ * Basically, it creates a GemFire {@link Cache} based on
+ * configuration passed in from the command line. (It will also
+ * take <code>gemfire.properties</code>, etc. into account, just
+ * like an application cache.)
+ *
+ * <P>
+ *
+ * After creating the cache and setting the server's status to {@link
+ * #RUNNING}, it periodically monitors the status, waiting for it to
+ * change to {@link #SHUTDOWN_PENDING} (see {@link #stop}). When
+ * the status does change, it closes the <code>Cache</code> and sets
+ * the status to be {@link #SHUTDOWN}.
+ *
+ * @param args Configuration options passed in from the command line
+ */
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD")
+ public void server(final String[] args) throws Exception {
+ isDedicatedCacheServer = true;
+ SystemFailure.setExitOK(true);
+
+ final Map<String, Object> options = getServerOptions(args);
+
+ final String serverPortString = (String) options.get(SERVER_PORT);
+
+ if (serverPortString != null) {
+ serverPort.set(Integer.parseInt(serverPortString));
+ }
+
+ serverBindAddress.set((String) options.get(SERVER_BIND_ADDRESS));
+ disableDefaultServer.set((Boolean) options.get(DISABLE_DEFAULT_SERVER));
+ workingDir = new File(System.getProperty("user.dir"));
+
+ // Say that we're starting...
+ Status originalStatus = createStatus(this.baseName, STARTING, OSProcess.getId());
+ status = originalStatus;
+ writeStatus(status);
+
+ // Connect to the distributed system. The properties will
+ // properly configure logging, the declarative caching file, etc.
+ final Properties props = (Properties) options.get(PROPERTIES);
+
+ if (props.getProperty(DistributionConfig.LOG_FILE_NAME) == null && CacheServerLauncher.isLoggingToStdOut()) {
+ // Check First if the gemfire.properties set the log-file. If they do, we shouldn't override that default
+ final Properties gemfireProperties = new Properties();
+
+ DistributionConfigImpl.loadGemFireProperties(gemfireProperties);
+
+ if (gemfireProperties.get(DistributionConfig.LOG_FILE_NAME) == null) {
+ // Do not allow the cache server to log to stdout, override the logger with #defaultLogFileName
+ props.setProperty(DistributionConfig.LOG_FILE_NAME, defaultLogFileName);
+ }
+ }
+
+ InternalDistributedSystem system = this.connect(props);
+
+ installLogListener();
+
+ logger = system.getLogWriter().convertToLogWriterI18n();
+ // redirect output to the log file
+ OSProcess.redirectOutput(system.getConfig().getLogFile());
+
+ Cache cache = this.createCache(system, options);
+ cache.setIsServer(true);
+ startAdditionalServices(cache, options);
+
+ this.running();
+
+ clearLogListener();
+
+ if (ASSIGN_BUCKETS) {
+ for (PartitionedRegion region : ((GemFireCacheImpl) cache).getPartitionedRegions()) {
+ PartitionRegionHelper.assignBucketsToPartitions(region);
+ }
+ }
+
+ if (Boolean.TRUE.equals(options.get(REBALANCE))) {
+ cache.getResourceManager().createRebalanceFactory().start();
+ }
+
+ File statusFile = new File( workingDir, statusName );
+ long lastModified=0, oldModified = statusFile.lastModified();
+ // Every FORCE_STATUS_FILE_READ_ITERATION_COUNT iterations, read the status file despite the modification time
+ // to catch situations where the file is modified quicker than the file timestamp's resolution.
+ short count = 0;
+ boolean loggedWarning = false;
+ while(true) {
+ lastModified = statusFile.lastModified();
+ if (lastModified > oldModified || count++ == FORCE_STATUS_FILE_READ_ITERATION_COUNT) {
+ count = 0;
+ Thread.sleep( 500 ); // allow for it to be finished writing.
+ //Sometimes the status file is partially written causing readObject to
+ //fail, sleep and retry.
+ try {
+ status = readStatus( );
+ } catch(IOException ioeSecondChance) {
+ Thread.sleep(1000);
+ try {
+ status = readStatus( );
+ } catch(IOException ioeThirdChance) {
+ Thread.sleep(5000);
+ try {
+ status = readStatus( );
+ } catch (FileNotFoundException fnfe) {
+ // See bug 44627.
+ // The cache server used to just shutdown at this point. Instead,
+ // recreate the status file if possible and continue.
+ status = createStatus(this.baseName, RUNNING, originalStatus.pid);
+ try {
+ writeStatus(status);
+ } catch (FileNotFoundException e) {
+ if (!loggedWarning) {
+ logger.warning(LocalizedStrings.CacheServerLauncher_CREATE_STATUS_EXCEPTION_0, e.toString());
+ loggedWarning = true;
+ }
+ }
+ }
+ }
+ }
+ oldModified = lastModified;
+ if (status.state == SHUTDOWN_PENDING) {
+ stopAdditionalServices();
+ this.disconnect(cache);
+ status.state = SHUTDOWN;
+ writeStatus(status);
+ } else {
+ Thread.sleep( 250 );
+ }
+
+ } else {
+ Thread.sleep(1000);
+ }
+ if (!system.isConnected()) {
+// System.out.println("System is disconnected. isReconnecting = " + system.isReconnecting());
+ boolean reconnected = false;
+ if (system.isReconnecting()) {
+ reconnected = system.waitUntilReconnected(-1, TimeUnit.SECONDS);
+ if (reconnected) {
+ system = (InternalDistributedSystem)system.getReconnectedSystem();
+ cache = GemFireCacheImpl.getInstance();
+ }
+ }
+ if (!reconnected) {
+ // shutdown-all disconnected the DS
+ System.exit(0);
+ }
+ }
+ }
+ }
+
+ private void installLogListener() {
+ MainLogReporter reporter = new MainLogReporter(this.status);
+ StartupStatus.setListener(reporter);
+ reporter.setDaemon(true);
+ reporter.start();
+ }
+
+ private void clearLogListener() {
+ MainLogReporter mainLogListener = (MainLogReporter) StartupStatus.getStartupListener();
+ if(mainLogListener != null) {
+ mainLogListener.shutdown();
+ StartupStatus.clearListener();
+ }
+ }
+
+ protected InternalDistributedSystem connect(Properties props) {
+ return (InternalDistributedSystem)DistributedSystem.connect(props);
+ }
+
+ protected static float getCriticalHeapPercent(Map<String, Object> options) {
+ if (options != null) {
+ String criticalHeapThreshold = (String)options
+ .get(CRITICAL_HEAP_PERCENTAGE);
+ if (criticalHeapThreshold != null) {
+ return Float.parseFloat(criticalHeapThreshold
+ .substring(criticalHeapThreshold.indexOf("=") + 1));
+ }
+ }
+ return -1.0f;
+ }
+
+ protected static float getEvictionHeapPercent(Map<String, Object> options) {
+ if (options != null) {
+ String evictionHeapThreshold = (String)options
+ .get(EVICTION_HEAP_PERCENTAGE);
+ if (evictionHeapThreshold != null) {
+ return Float.parseFloat(evictionHeapThreshold
+ .substring(evictionHeapThreshold.indexOf("=") + 1));
+ }
+ }
+ return -1.0f;
+ }
+
+ protected static float getCriticalOffHeapPercent(Map<String, Object> options) {
+ if (options != null) {
+ String criticalOffHeapThreshold = (String)options
+ .get(CRITICAL_OFF_HEAP_PERCENTAGE);
+ if (criticalOffHeapThreshold != null) {
+ return Float.parseFloat(criticalOffHeapThreshold
+ .substring(criticalOffHeapThreshold.indexOf("=") + 1));
+ }
+ }
+ return -1.0f;
+ }
+
+ protected static float getEvictionOffHeapPercent(Map<String, Object> options) {
+ if (options != null) {
+ String evictionOffHeapThreshold = (String)options
+ .get(EVICTION_OFF_HEAP_PERCENTAGE);
+ if (evictionOffHeapThreshold != null) {
+ return Float.parseFloat(evictionOffHeapThreshold
+ .substring(evictionOffHeapThreshold.indexOf("=") + 1));
+ }
+ }
+ return -1.0f;
+ }
+
+ protected Cache createCache(InternalDistributedSystem system, Map<String, Object> options) throws IOException {
+ Cache cache = CacheFactory.create(system);
+
+ float threshold = getCriticalHeapPercent(options);
+ if (threshold > 0.0f) {
+ cache.getResourceManager().setCriticalHeapPercentage(threshold);
+ }
+ threshold = getEvictionHeapPercent(options);
+ if (threshold > 0.0f) {
+ cache.getResourceManager().setEvictionHeapPercentage(threshold);
+ }
+
+ threshold = getCriticalOffHeapPercent(options);
+ getCriticalOffHeapPercent(options);
+ if (threshold > 0.0f) {
+ cache.getResourceManager().setCriticalOffHeapPercentage(threshold);
+ }
+ threshold = getEvictionOffHeapPercent(options);
+ if (threshold > 0.0f) {
+ cache.getResourceManager().setEvictionOffHeapPercentage(threshold);
+ }
+
+
+ // Create and start a default cache server
+ // If (disableDefaultServer is not set or it is set but false) AND (the number of cacheservers is 0)
+ Boolean disable = disableDefaultServer.get();
+ if ((disable == null || !disable) && cache.getCacheServers().size() == 0) {
+ // Create and add a cache server
+ CacheServer server = cache.addCacheServer();
++
++ CacheServerHelper.setIsDefaultServer(server);
+
+ // Set its port if necessary
+ Integer serverPort = CacheServerLauncher.getServerPort();
+ if (serverPort != null) {
+ server.setPort(serverPort);
+ }
+
+ // Set its bind address if necessary
+ String serverBindAddress = getServerBindAddress();
+ if (serverBindAddress != null) {
+ server.setBindAddress(serverBindAddress.trim());
+ }
+
+ // Start it
+ server.start();
+ }
+
+ return cache;
+ }
+
+ protected void disconnect(Cache cache) {
+ DistributedSystem dsys = cache.getDistributedSystem();
+ cache.close();
+ dsys.disconnect();
+ }
+
+ /**
+ * Stops a cache server (which is running in a different VM) by setting its status to {@link #SHUTDOWN_PENDING}.
+ * Waits for the cache server to actually shut down.
+ */
+ public void stop(final String[] args) throws Exception {
+ this.workingDir = (File) getStopOptions(args).get(DIR);
+
+ // determine the current state of the Cache Server process...
+ final File statusFile = new File(this.workingDir, this.statusName);
+ int exitStatus = 1;
+
+ if (statusFile.exists()) {
+ this.status = spinReadStatus();
+
+ // upon reading the status file, request the Cache Server to shutdown if it has not already...
+ if (this.status.state != SHUTDOWN) {
+ // copy server PID and not use own PID; see bug #39707
+ this.status = createStatus(this.baseName, SHUTDOWN_PENDING, this.status.pid);
+ writeStatus(this.status);
+ }
+
+ // poll the Cache Server for a response to our shutdown request (passes through if the Cache Server
+ // has already shutdown)...
+ pollCacheServerForShutdown();
+
+ // after polling, determine the status of the Cache Server one last time and determine how to exit...
+ if (this.status.state == SHUTDOWN) {
+ System.out.println(LocalizedStrings.CacheServerLauncher_0_STOPPED.toLocalizedString(this.baseName));
+ deleteStatus();
+ exitStatus = 0;
+ }
+ else {
+ System.out.println(LocalizedStrings.CacheServerLauncher_TIMEOUT_WAITING_FOR_0_TO_SHUTDOWN_STATUS_IS_1
+ .toLocalizedString(this.baseName, this.status));
+ }
+ }
+ else {
+ System.out.println(LocalizedStrings.CacheServerLauncher_THE_SPECIFIED_WORKING_DIRECTORY_0_CONTAINS_NO_STATUS_FILE
+ .toLocalizedString(this.workingDir));
+ }
+
+ if (DONT_EXIT_AFTER_LAUNCH) {
+ return;
+ }
+
+ System.exit(exitStatus);
+ }
+
+ private void pollCacheServerForShutdown() throws InterruptedException {
+ final int increment = 250; // unit is in milliseconds
+ int clock = 0;
+
+ // wait for a total of 20000 milliseconds (or 20 seconds)
+ while (clock < SHUTDOWN_WAIT_TIME && status.state != SHUTDOWN) {
+ try {
+ status = readStatus();
+ }
+ catch (IOException ignore) {
+ }
+
+ try {
+ Thread.sleep(increment);
+ }
+ catch (InterruptedException ie) {
+ break;
+ }
+
+ clock += increment;
+ }
+ }
+
+ /**
+ * A class that represents the status of a cache server. Instances
+ * of this class are serialized to a {@linkplain #statusName file}
+ * on disk.
+ *
+ * @see #SHUTDOWN
+ * @see #STARTING
+ * @see #RUNNING
+ * @see #SHUTDOWN_PENDING
+ */
+ static class Status implements Serializable {
+
+ private static final long serialVersionUID = 190943081363646485L;
+ public int state = 0;
+ public int pid = 0;
+
+ private final String baseName;
+ public Throwable exception;
+ public String msg;
+ public String dsMsg;
+
+ public Status(String baseName) {
+ this.baseName = baseName;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder buffer = new StringBuilder();
+ buffer.append(this.baseName).append(" pid: ").append(pid).append(" status: ");
+ switch (state) {
+ case SHUTDOWN:
+ buffer.append("stopped");
+ break;
+ case STARTING:
+ buffer.append("starting");
+ break;
+ case RUNNING:
+ buffer.append("running");
+ break;
+ case SHUTDOWN_PENDING:
+ buffer.append("stopping");
+ break;
+ default:
+ buffer.append("unknown");
+ break;
+ }
+ if (exception != null) {
+ if (msg != null) {
+ buffer.append("\n").append(msg).append(" - ");
+ }
+ else {
+ buffer.append("\nException in ").append(this.baseName).append(" - ");
+ }
+ buffer.append(LocalizedStrings
+ .CacheServerLauncher_SEE_LOG_FILE_FOR_DETAILS.toLocalizedString());
+ }
+ else if (this.dsMsg != null) {
+ buffer.append('\n').append(this.dsMsg);
+ }
+ return buffer.toString();
+ }
+ }
+
+ /**
+ * Notes that an error has occurred in the cache server and that it
+ * has shut down because of it.
+ */
+ protected void setServerError(final String msg, final Throwable t) {
+ try {
+ writeStatus(createStatus(this.baseName, SHUTDOWN, OSProcess.getId(), msg, t));
+ }
+ catch (Exception e) {
+ if (logger != null) {
+ logger.severe(e);
+ }
+ else {
+ e.printStackTrace();
+ }
+ System.exit(1);
+ }
+ }
+
+ /**
+ * Sets the status of a cache server by serializing a <code>Status</code>
+ * instance to a file in the server's working directory.
+ */
+ public void writeStatus(final Status s) throws IOException {
+ FileOutputStream fileOutput = null;
+ ObjectOutputStream objectOutput = null;
+
+ try {
+ fileOutput = new FileOutputStream(new File(workingDir, statusName));
+ objectOutput = new ObjectOutputStream(fileOutput);
+ objectOutput.writeObject(s);
+ objectOutput.flush();
+ }
+ finally {
+ IOUtils.close(objectOutput);
+ IOUtils.close(fileOutput);
+ }
+ }
+
+ /**
+ * Reads a cache server's status. If the status file cannot be read because of I/O problems, it will try again.
+ */
+ protected Status spinReadStatus() {
+ final long timeout = (System.currentTimeMillis() + 60000);
+ Status status = null;
+
+ while (status == null && System.currentTimeMillis() < timeout) {
+ try {
+ status = readStatus();
+ }
+ catch (Exception e) {
+ // try again - the status might have been read in the middle of it being written by the server resulting in
+ // an EOFException here
+ try {
+ Thread.sleep(500);
+ }
+ catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ status = null;
+ break;
+ }
+ }
+ }
+
+ return status;
+ }
+
+ /**
+ * Reads a cache server's status from a file in its working directory.
+ */
+ protected Status readStatus() throws InterruptedException, IOException {
+ final File statusFile = new File(workingDir, statusName);
+
+ FileInputStream fileInput = null;
+ ObjectInputStream objectInput = null;
+
+ try {
+ fileInput = new FileInputStream(statusFile);
+ objectInput = new ObjectInputStream(fileInput);
+
+ Status status = (Status) objectInput.readObject();
+
+ // See bug 32760
+ // Note, only execute the conditional createStatus statement if we are in native mode; if we are in pure Java mode
+ // the the process ID identified in the Status object is assumed to exist!
+ if (!isExistingProcess(status.pid)) {
+ status = createStatus(this.baseName, SHUTDOWN, status.pid);
+ }
+
+ return status;
+ }
+ catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ catch (FileNotFoundException e) {
+ Thread.sleep(500);
+
+ if (statusFile.exists()) {
+ return readStatus();
+ }
+ else {
+ throw e;
+ }
+ }
+ finally {
+ IOUtils.close(objectInput);
+ IOUtils.close(fileInput);
+ }
+ }
+
+ /**
+ * Removes a cache server's status file
+ */
+ private void deleteStatus() throws IOException {
+ final File statusFile = new File(workingDir, statusName);
+
+ if (statusFile.exists() && !statusFile.delete()) {
+ throw new IOException("Could not delete status file (" + statusFile.getAbsolutePath() + ")!");
+ }
+ }
+
+ protected boolean isExistingProcess(final int pid) {
+ return (PureJavaMode.isPure() || (pid != 0 && OSProcess.exists(pid)));
+ }
+
+ protected void waitForRunning() throws Exception {
+ Status status = spinReadStatus();
+ String lastReadMessage = null;
+ String lastReportedMessage = null;
+ long lastReadTime = System.nanoTime();
+ if ( status == null ) {
+ throw new Exception(LocalizedStrings.CacheServerLauncher_NO_AVAILABLE_STATUS.toLocalizedString());
+ } else {
+ switch ( status.state ) {
+ case STARTING:
+ // re-read status for a while...
+ while( status.state == STARTING ) {
+ Thread.sleep( 500 ); // fix for bug 36998
+ status = spinReadStatus();
+
+ //check to see if the status message has changed
+ if(status.dsMsg != null && !status.dsMsg.equals(lastReadMessage)) {
+ lastReadMessage = status.dsMsg;
+ lastReadTime = System.nanoTime();
+ }
+
+ //if the status message has not changed for 15 seconds, print
+ //out the message.
+ long elapsed = System.nanoTime() - lastReadTime;
+ if(TimeUnit.NANOSECONDS.toMillis(elapsed) > STATUS_WAIT_TIME
+ && lastReadMessage != null &&
+ !lastReadMessage.equals(lastReportedMessage)) {
+ long elapsedSec = TimeUnit.NANOSECONDS.toSeconds(elapsed);
+ System.out.println(LocalizedStrings.CacheServerLauncher_LAUNCH_IN_PROGRESS_0
+ .toLocalizedString(elapsedSec, status.dsMsg));
+ lastReportedMessage = lastReadMessage;
+ }
+ }
+ if (status.state == SHUTDOWN) {
+ System.out.println(status);
+ System.exit(1);
+ }
+ break;
+ default:
+ break;
+ }
+ System.out.println( status );
+ }
+ }
+
+ /**
+ * Reads {@link DistributedSystem#PROPERTY_FILE} and determines if the
+ * {@link DistributionConfig#LOG_FILE_NAME} property is set to stdout
+ * @return true if the logging would go to stdout
+ */
+ private static boolean isLoggingToStdOut() {
+ Properties gfprops = new Properties();
+ URL url = DistributedSystem.getPropertyFileURL();
+ if (url != null) {
+ try {
+ gfprops.load(url.openStream());
+ } catch (IOException io) {
+ //throw new GemFireIOException("Failed reading " + url, io);
+ System.out.println("Failed reading " + url);
+ System.exit( 1 );
+ }
+ final String logFile = gfprops.getProperty(DistributionConfig.LOG_FILE_NAME);
+ if ( logFile == null || logFile.length() == 0 ) {
+ return true;
+ }
+ } else {
+ //Didnt find a property file, assuming the default is to log to stdout
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Process information contained in the options map and add to the command
+ * line of the subprocess as needed.
+ */
+ protected void addToServerCommand(final List<String> commandLine, final Map<String, Object> options) {
+ final ListWrapper<String> commandLineWrapper = new ListWrapper<String>(commandLine);
+
+ if (Boolean.TRUE.equals(options.get(REBALANCE))) {
+ commandLineWrapper.add("-rebalance");
+ }
+
+ commandLineWrapper.add((String) options.get(DISABLE_DEFAULT_SERVER));
+ commandLineWrapper.add((String) options.get(SERVER_PORT));
+ commandLineWrapper.add((String) options.get(SERVER_BIND_ADDRESS));
+
+ String criticalHeapThreshold = (String)options.get(CRITICAL_HEAP_PERCENTAGE);
+ if (criticalHeapThreshold != null) {
+ commandLineWrapper.add(criticalHeapThreshold);
+ }
+ String evictionHeapThreshold = (String)options
+ .get(EVICTION_HEAP_PERCENTAGE);
+ if (evictionHeapThreshold != null) {
+ commandLineWrapper.add(evictionHeapThreshold);
+ }
+
+ String criticalOffHeapThreshold = (String)options.get(CRITICAL_OFF_HEAP_PERCENTAGE);
+ if (criticalOffHeapThreshold != null) {
+ commandLineWrapper.add(criticalOffHeapThreshold);
+ }
+ String evictionOffHeapThreshold = (String)options
+ .get(EVICTION_OFF_HEAP_PERCENTAGE);
+ if (evictionOffHeapThreshold != null) {
+ commandLineWrapper.add(evictionOffHeapThreshold);
+ }
+
+ final Properties props = (Properties) options.get(PROPERTIES);
+
+ for (final Object key : props.keySet()) {
+ commandLineWrapper.add(key + "=" + props.getProperty(key.toString()));
+ }
+
+ if (props.getProperty(DistributionConfig.LOG_FILE_NAME) == null && CacheServerLauncher.isLoggingToStdOut()) {
+ // Do not allow the cache server to log to stdout; override the logger with #defaultLogFileName
+ commandLineWrapper.add(DistributionConfig.LOG_FILE_NAME + "=" + defaultLogFileName);
+ }
+ }
+
+ /**
+ * This method is called immediately following cache creation in the spawned
+ * process, but prior to setting the RUNNING flag in the status file. So the
+ * spawning process will block until this method completes.
+ */
+ protected void startAdditionalServices(final Cache cache, final Map<String, Object> options) throws Exception {
+ }
+
+ /**
+ * This method is called prior to DistributedSytstem.disconnect(). Care should
+ * be taken not to take too long in this method or else
+ * #CacheServerLauncher.stop may timeout.
+ */
+ protected void stopAdditionalServices() throws Exception {
+ }
+
+ /**
+ * A List implementation that disallows null values.
+ * @param <E> the Class type for the List elements.
+ */
+ protected static class ListWrapper<E> extends AbstractList<E> {
+
+ private static final ThreadLocal<Boolean> addResult = new ThreadLocal<Boolean>();
+
+ private final List<E> list;
+
+ public ListWrapper(final List<E> list) {
+ assert list != null : "The List cannot be null!";
+ this.list = list;
+ }
+
+ @Override
+ public boolean add(final E e) {
+ final boolean localAddResult = super.add(e);
+ return (localAddResult && addResult.get());
+ }
+
+ @Override
+ public void add(final int index, final E element) {
+ if (element != null) {
+ list.add(index, element);
+ }
+ addResult.set(element != null);
+ }
+
+ @Override
+ public E get(final int index) {
+ return this.list.get(index);
+ }
+
+ @Override
+ public E remove(final int index) {
+ return list.remove(index);
+ }
+
+ @Override
+ public E set(final int index, final E element) {
+ return (element != null ? list.set(index, element) : list.get(index));
+ }
+
+ @Override
+ public int size() {
+ return list.size();
+ }
+ }
+
+ private class MainLogReporter extends Thread implements StartupStatusListener {
+ private String lastLogMessage;
+ private final Status status;
+ boolean running = true;
+
+ public MainLogReporter(Status status) {
+ this.status = status;
+ }
+
+ public synchronized void shutdown() {
+ this.running = false;
+ this.status.dsMsg = null;
+ this.notifyAll();
+ }
+
+
+ @Override
+ public void setStatus(String status) {
+ lastLogMessage = status;
+ }
+
+ public synchronized void run() {
+ while(running) {
+ try {
+ wait(1000);
+ } catch (InterruptedException e) {
+ //this should not happen.
+ break;
+ }
+ if(running && lastLogMessage != status.dsMsg) {
+ status.dsMsg = lastLogMessage;
+ try {
+ writeStatus(status);
+ } catch (IOException e) {
+ //this could happen if there was a concurrent write to the file
+ //eg a stop.
+ continue;
+ }
+ }
+ }
+ }
+ }
+}