You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/22 19:36:00 UTC
[38/83] [abbrv] [partial] incubator-geode git commit: Merge
remote-tracking branch 'origin/develop' into feature/GEODE-917
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index 3bd1e83,0000000..f3ba7a2
mode 100755,000000..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@@ -1,1109 -1,0 +1,1116 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.distributed.internal.membership.gms.messenger;
+
+import static com.gemstone.gemfire.distributed.internal.membership.gms.GMSUtil.replaceStrings;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_REQUEST;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_RESPONSE;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.logging.log4j.Logger;
+import org.jgroups.Address;
+import org.jgroups.Event;
+import org.jgroups.JChannel;
+import org.jgroups.Message;
+import org.jgroups.Message.Flag;
+import org.jgroups.Message.TransientFlag;
+import org.jgroups.ReceiverAdapter;
+import org.jgroups.View;
+import org.jgroups.ViewId;
+import org.jgroups.conf.ClassConfigurator;
+import org.jgroups.protocols.UDP;
+import org.jgroups.protocols.pbcast.NAKACK2;
+import org.jgroups.stack.IpAddress;
+import org.jgroups.util.Digest;
+import org.jgroups.util.UUID;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.ForcedDisconnectException;
+import com.gemstone.gemfire.GemFireConfigException;
+import com.gemstone.gemfire.GemFireIOException;
+import com.gemstone.gemfire.SystemConnectException;
+import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
+import com.gemstone.gemfire.distributed.DurableClientAttributes;
+import com.gemstone.gemfire.distributed.internal.DMStats;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.DistributionMessage;
++import com.gemstone.gemfire.distributed.internal.DistributionStats;
+import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.distributed.internal.membership.MemberAttributes;
+import com.gemstone.gemfire.distributed.internal.membership.NetView;
+import com.gemstone.gemfire.distributed.internal.membership.QuorumChecker;
+import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMember;
+import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
+import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.MessageHandler;
+import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Messenger;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinRequestMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinResponseMessage;
+import com.gemstone.gemfire.internal.ClassPathLoader;
+import com.gemstone.gemfire.internal.HeapDataOutputStream;
+import com.gemstone.gemfire.internal.OSProcess;
+import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.VersionedDataInputStream;
+import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig;
+import com.gemstone.gemfire.internal.cache.DirectReplyMessage;
+import com.gemstone.gemfire.internal.cache.DistributedCacheOperation;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.tcp.MemberShunnedException;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
+public class JGroupsMessenger implements Messenger {
+
+ private static final Logger logger = Services.getLogger();
+
+ /**
+ * The location (in the product) of the locator Jgroups config file.
+ */
+ private static final String DEFAULT_JGROUPS_TCP_CONFIG = "com/gemstone/gemfire/distributed/internal/membership/gms/messenger/jgroups-config.xml";
+
+ /**
+ * The location (in the product) of the mcast Jgroups config file.
+ */
+ private static final String JGROUPS_MCAST_CONFIG_FILE_NAME = "com/gemstone/gemfire/distributed/internal/membership/gms/messenger/jgroups-mcast.xml";
+
+ /** JG magic numbers for types added to the JG ClassConfigurator */
+ public static final short JGROUPS_TYPE_JGADDRESS = 2000;
+ public static final short JGROUPS_PROTOCOL_TRANSPORT = 1000;
+
+ public static boolean THROW_EXCEPTION_ON_START_HOOK;
+
+ String jgStackConfig;
+
+ JChannel myChannel;
+ InternalDistributedMember localAddress;
+ JGAddress jgAddress;
+ Services services;
+
+ /** handlers that receive certain classes of messages instead of the Manager */
+ Map<Class, MessageHandler> handlers = new ConcurrentHashMap<Class, MessageHandler>();
+
+ private volatile NetView view;
+
+ private GMSPingPonger pingPonger = new GMSPingPonger();
+
+ protected AtomicLong pongsReceived = new AtomicLong(0);
+
+ /**
+ * A set that contains addresses that we have logged JGroups IOExceptions for in the
+ * current membership view and possibly initiated suspect processing. This
+ * reduces the amount of suspect processing initiated by IOExceptions and the
+ * amount of exceptions logged
+ */
+ private Set<Address> addressesWithioExceptionsProcessed = Collections.synchronizedSet(new HashSet<Address>());
+
+ static {
+ // register classes that we've added to jgroups that are put on the wire
+ // or need a header ID
+ ClassConfigurator.add(JGROUPS_TYPE_JGADDRESS, JGAddress.class);
+ ClassConfigurator.addProtocol(JGROUPS_PROTOCOL_TRANSPORT, Transport.class);
+ }
+
+ @Override
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD")
+ public void init(Services s) {
+ this.services = s;
+
+ RemoteTransportConfig transport = services.getConfig().getTransport();
+ DistributionConfig dc = services.getConfig().getDistributionConfig();
+
+
+ boolean b = dc.getEnableNetworkPartitionDetection();
+ if (b) {
+ if (!SocketCreator.FORCE_DNS_USE) {
+ SocketCreator.resolve_dns = false;
+ }
+ }
+ System.setProperty("jgroups.resolve_dns", String.valueOf(!b));
+
+ InputStream is= null;
+
+ String r = null;
+ if (transport.isMcastEnabled()) {
+ r = JGROUPS_MCAST_CONFIG_FILE_NAME;
+ } else {
+ r = DEFAULT_JGROUPS_TCP_CONFIG;
+ }
+ is = ClassPathLoader.getLatest().getResourceAsStream(getClass(), r);
+ if (is == null) {
+ throw new GemFireConfigException(LocalizedStrings.GroupMembershipService_CANNOT_FIND_0.toLocalizedString(r));
+ }
+
+ String properties;
+ try {
+ //PlainConfigurator config = PlainConfigurator.getInstance(is);
+ //properties = config.getProtocolStackString();
+ StringBuffer sb = new StringBuffer(3000);
+ BufferedReader br;
+ br = new BufferedReader(new InputStreamReader(is, "US-ASCII"));
+ String input;
+ while ((input=br.readLine()) != null) {
+ sb.append(input);
+ }
+ br.close();
+ properties = sb.toString();
+ }
+ catch (Exception ex) {
+ throw new GemFireConfigException(LocalizedStrings.GroupMembershipService_AN_EXCEPTION_WAS_THROWN_WHILE_READING_JGROUPS_CONFIG.toLocalizedString(), ex);
+ }
+
+
+ if (transport.isMcastEnabled()) {
+ properties = replaceStrings(properties, "MCAST_PORT", String.valueOf(transport.getMcastId().getPort()));
+ properties = replaceStrings(properties, "MCAST_ADDRESS", dc.getMcastAddress().getHostAddress());
+ properties = replaceStrings(properties, "MCAST_TTL", String.valueOf(dc.getMcastTtl()));
+ properties = replaceStrings(properties, "MCAST_SEND_BUFFER_SIZE", String.valueOf(dc.getMcastSendBufferSize()));
+ properties = replaceStrings(properties, "MCAST_RECV_BUFFER_SIZE", String.valueOf(dc.getMcastRecvBufferSize()));
+ properties = replaceStrings(properties, "MCAST_RETRANSMIT_INTERVAL", ""+Integer.getInteger("gemfire.mcast-retransmit-interval", 500));
+ properties = replaceStrings(properties, "RETRANSMIT_LIMIT", String.valueOf(dc.getUdpFragmentSize()-256));
+ }
+
+ if (transport.isMcastEnabled() || transport.isTcpDisabled() ||
+ (dc.getUdpRecvBufferSize() != DistributionConfig.DEFAULT_UDP_RECV_BUFFER_SIZE) ) {
+ properties = replaceStrings(properties, "UDP_RECV_BUFFER_SIZE", ""+dc.getUdpRecvBufferSize());
+ }
+ else {
+ properties = replaceStrings(properties, "UDP_RECV_BUFFER_SIZE", ""+DistributionConfig.DEFAULT_UDP_RECV_BUFFER_SIZE_REDUCED);
+ }
+ properties = replaceStrings(properties, "UDP_SEND_BUFFER_SIZE", ""+dc.getUdpSendBufferSize());
+
+ String str = transport.getBindAddress();
+ // JGroups UDP protocol requires a bind address
+ if (str == null || str.length() == 0) {
+ try {
+ str = SocketCreator.getLocalHost().getHostAddress();
+ } catch (UnknownHostException e) {
+ throw new GemFireConfigException(e.getMessage(), e);
+ }
+ }
+ properties = replaceStrings(properties, "BIND_ADDR_SETTING", "bind_addr=\""+str+"\"");
+
+ int port = Integer.getInteger("gemfire.jg-bind-port", 0);
+ if (port != 0) {
+ properties = replaceStrings(properties, "MEMBERSHIP_PORT_RANGE_START", ""+port);
+ } else {
+ int[] ports = dc.getMembershipPortRange();
+ properties = replaceStrings(properties, "MEMBERSHIP_PORT_RANGE_START", ""+ports[0]);
+ properties = replaceStrings(properties, "MEMBERSHIP_PORT_RANGE_END", ""+(ports[1]-ports[0]));
+ }
+
+ properties = replaceStrings(properties, "UDP_FRAGMENT_SIZE", ""+dc.getUdpFragmentSize());
+
+ properties = replaceStrings(properties, "FC_MAX_CREDITS", ""+dc.getMcastFlowControl().getByteAllowance());
+ properties = replaceStrings(properties, "FC_THRESHOLD", ""+dc.getMcastFlowControl().getRechargeThreshold());
+ properties = replaceStrings(properties, "FC_MAX_BLOCK", ""+dc.getMcastFlowControl().getRechargeBlockMs());
+
+ this.jgStackConfig = properties;
+
+ }
+
+ @Override
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD")
+ public void start() {
+ // create the configuration XML string for JGroups
+ String properties = this.jgStackConfig;
+
+ long start = System.currentTimeMillis();
+
+ // start the jgroups channel and establish the membership ID
+ boolean reconnecting = false;
+ try {
+ Object oldChannel = services.getConfig().getTransport().getOldDSMembershipInfo();
+ if (oldChannel != null) {
+ logger.debug("Reusing JGroups channel from previous system", properties);
+
+ myChannel = (JChannel)oldChannel;
+ // scrub the old channel
+ ViewId vid = new ViewId(new JGAddress(), 0);
+ View jgv = new View(vid, new ArrayList<Address>());
+ this.myChannel.down(new Event(Event.VIEW_CHANGE, jgv));
+ UUID logicalAddress = (UUID)myChannel.getAddress();
+ if (logicalAddress instanceof JGAddress) {
+ ((JGAddress)logicalAddress).setVmViewId(-1);
+ }
+ reconnecting = true;
+ }
+ else {
+ logger.debug("JGroups configuration: {}", properties);
+
+ checkForIPv6();
+ InputStream is = new ByteArrayInputStream(properties.getBytes("UTF-8"));
+ myChannel = new JChannel(is);
+ }
+ } catch (Exception e) {
+ throw new GemFireConfigException("unable to create jgroups channel", e);
+ }
+
+ // give the stats to the jchannel statistics recorder
+ StatRecorder sr = (StatRecorder)myChannel.getProtocolStack().findProtocol(StatRecorder.class);
+ if (sr != null) {
+ sr.setDMStats(services.getStatistics());
+ }
+
+ Transport transport = (Transport)myChannel.getProtocolStack().getTransport();
+ transport.setMessenger(this);
+
+ try {
+ myChannel.setReceiver(null);
+ myChannel.setReceiver(new JGroupsReceiver());
+ if (!reconnecting) {
+ myChannel.connect("AG"); // apache g***** (whatever we end up calling it)
+ }
+ } catch (Exception e) {
+ myChannel.close();
+ throw new SystemConnectException("unable to create jgroups channel", e);
+ }
+
+ if (JGroupsMessenger.THROW_EXCEPTION_ON_START_HOOK) {
+ JGroupsMessenger.THROW_EXCEPTION_ON_START_HOOK = false;
+ throw new SystemConnectException("failing for test");
+ }
+
+ establishLocalAddress();
+
+ logger.info("JGroups channel {} (took {}ms)", (reconnecting? "reinitialized" : "created"), System.currentTimeMillis()-start);
+
+ }
+
+ /**
+ * JGroups picks an IPv6 address if preferIPv4Stack is false or not set
+ * and preferIPv6Addresses is not set or is true. We want it to use an
+ * IPv4 address for a dual-IP stack so that both IPv4 and IPv6 messaging work
+ */
+ private void checkForIPv6() throws Exception {
+ boolean preferIpV6Addr = Boolean.getBoolean("java.net.preferIPv6Addresses");
+ if (!preferIpV6Addr) {
+ logger.debug("forcing JGroups to think IPv4 is being used so it will choose an IPv4 address");
+ Field m = org.jgroups.util.Util.class.getDeclaredField("ip_stack_type");
+ m.setAccessible(true);
+ m.set(null, org.jgroups.util.StackType.IPv4);
+ }
+ }
+
+ @Override
+ public void started() {
+ }
+
+ @Override
+ public void stop() {
+ if (this.myChannel != null) {
+ if ((services.isShutdownDueToForcedDisconnect() && services.isAutoReconnectEnabled()) || services.getManager().isReconnectingDS()) {
+ // leave the channel open for reconnect attempts
+ }
+ else {
+ this.myChannel.close();
+ }
+ }
+ }
+
+ @Override
+ public void stopped() {
+ }
+
+ @Override
+ public void memberSuspected(InternalDistributedMember initiator, InternalDistributedMember suspect, String reason) {
+ }
+
+ @Override
+ public void installView(NetView v) {
+ this.view = v;
+
+ if (this.jgAddress.getVmViewId() < 0) {
+ this.jgAddress.setVmViewId(this.localAddress.getVmViewId());
+ }
+ List<JGAddress> mbrs = new ArrayList<JGAddress>(v.size());
+ for (InternalDistributedMember idm: v.getMembers()) {
+ mbrs.add(new JGAddress(idm));
+ }
+ ViewId vid = new ViewId(new JGAddress(v.getCoordinator()), v.getViewId());
+ View jgv = new View(vid, new ArrayList<Address>(mbrs));
+ logger.trace("installing JGroups view: {}", jgv);
+ this.myChannel.down(new Event(Event.VIEW_CHANGE, jgv));
+
+ addressesWithioExceptionsProcessed.clear();
+ }
+
+
+ /**
+ * If JGroups is unable to send a message it may mean that the network
+ * is down. If so we need to initiate suspect processing on the
+ * recipient.<p>
+ * see Transport._send()
+ */
+ public void handleJGroupsIOException(IOException e, Address dest) {
+ if (services.getManager().shutdownInProgress()) { // GEODE-634 - don't log IOExceptions during shutdown
+ return;
+ }
+ if (addressesWithioExceptionsProcessed.contains(dest)) {
+ return;
+ }
+ addressesWithioExceptionsProcessed.add(dest);
+ NetView v = this.view;
+ JGAddress jgMbr = (JGAddress)dest;
+ if (jgMbr != null && v != null) {
+ List<InternalDistributedMember> members = v.getMembers();
+ InternalDistributedMember recipient = null;
+ for (InternalDistributedMember mbr: members) {
+ GMSMember gmsMbr = ((GMSMember)mbr.getNetMember());
+ if (jgMbr.getUUIDLsbs() == gmsMbr.getUuidLSBs()
+ && jgMbr.getUUIDMsbs() == gmsMbr.getUuidMSBs()
+ && jgMbr.getVmViewId() == gmsMbr.getVmViewId()) {
+ recipient = mbr;
+ break;
+ }
+ }
+ if (recipient != null) {
+ services.getHealthMonitor().suspect(recipient,
+ "Unable to send messages to this member via JGroups");
+ }
+ }
+ }
+
+ private void establishLocalAddress() {
+ UUID logicalAddress = (UUID)myChannel.getAddress();
+ logicalAddress = logicalAddress.copy();
+
+ IpAddress ipaddr = (IpAddress)myChannel.down(new Event(Event.GET_PHYSICAL_ADDRESS));
+
+ if (ipaddr != null) {
+ this.jgAddress = new JGAddress(logicalAddress, ipaddr);
+ }
+ else {
+ UDP udp = (UDP)myChannel.getProtocolStack().getTransport();
+
+ try {
+ Method getAddress = UDP.class.getDeclaredMethod("getPhysicalAddress");
+ getAddress.setAccessible(true);
+ ipaddr = (IpAddress)getAddress.invoke(udp, new Object[0]);
+ this.jgAddress = new JGAddress(logicalAddress, ipaddr);
+ } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
+ logger.info("Unable to find getPhysicallAddress method in UDP - parsing its address instead");
+ }
+
+// if (this.jgAddress == null) {
+// String addr = udp.getLocalPhysicalAddress();
+// int cidx = addr.lastIndexOf(':'); // IPv6 literals might have colons
+// String host = addr.substring(0, cidx);
+// int jgport = Integer.parseInt(addr.substring(cidx+1, addr.length()));
+// try {
+// this.jgAddress = new JGAddress(logicalAddress, new IpAddress(InetAddress.getByName(host), jgport));
+// } catch (UnknownHostException e) {
+// myChannel.disconnect();
+// throw new SystemConnectException("unable to initialize jgroups address", e);
+// }
+// }
+ }
+
+ // install the address in the JGroups channel protocols
+ myChannel.down(new Event(Event.SET_LOCAL_ADDRESS, this.jgAddress));
+
+ DistributionConfig config = services.getConfig().getDistributionConfig();
+ boolean isLocator = (services.getConfig().getTransport().getVmKind() == DistributionManager.LOCATOR_DM_TYPE)
+ || !services.getConfig().getDistributionConfig().getStartLocator().isEmpty();
+
+ // establish the DistributedSystem's address
+ DurableClientAttributes dca = null;
+ if (config.getDurableClientId() != null) {
+ dca = new DurableClientAttributes(config.getDurableClientId(), config
+ .getDurableClientTimeout());
+ }
+ MemberAttributes attr = new MemberAttributes(
+ -1/*dcPort - not known at this time*/,
+ OSProcess.getId(),
+ services.getConfig().getTransport().getVmKind(),
+ -1/*view id - not known at this time*/,
+ config.getName(),
+ MemberAttributes.parseGroups(config.getRoles(), config.getGroups()),
+ dca);
+ localAddress = new InternalDistributedMember(jgAddress.getInetAddress(),
+ jgAddress.getPort(), config.getEnableNetworkPartitionDetection(),
+ isLocator, attr);
+
+ // add the JGroups logical address to the GMSMember
+ UUID uuid = this.jgAddress;
+ ((GMSMember)localAddress.getNetMember()).setUUID(uuid);
+ ((GMSMember)localAddress.getNetMember()).setMemberWeight((byte)(services.getConfig().getMemberWeight() & 0xff));
+ }
+
+ @Override
+ public void beSick() {
+ }
+
+ @Override
+ public void playDead() {
+ }
+
+ @Override
+ public void beHealthy() {
+ }
+
+ @Override
+ public void addHandler(Class c, MessageHandler h) {
+ handlers.put(c, h);
+ }
+
+ @Override
+ public boolean testMulticast(long timeout) throws InterruptedException {
+ long pongsSnapshot = pongsReceived.longValue();
+ JGAddress dest = null;
+ try {
+ pingPonger.sendPingMessage(myChannel, jgAddress, dest);
+ } catch (Exception e) {
+ logger.warn("unable to send multicast message: {}", (jgAddress==null? "multicast recipients":jgAddress),
+ e.getMessage());
+ return false;
+ }
+ long giveupTime = System.currentTimeMillis() + timeout;
+ while (pongsReceived.longValue() == pongsSnapshot && System.currentTimeMillis() < giveupTime) {
+ Thread.sleep(100);
+ }
+ return pongsReceived.longValue() > pongsSnapshot;
+ }
+
+ @Override
+ public void getMessageState(InternalDistributedMember target, Map state, boolean includeMulticast) {
+ if (includeMulticast) {
+ NAKACK2 nakack = (NAKACK2)myChannel.getProtocolStack().findProtocol("NAKACK2");
+ if (nakack != null) {
+ long seqno = nakack.getCurrentSeqno();
+ state.put("JGroups.mcastState", Long.valueOf(seqno));
+ }
+ }
+ }
+
+ @Override
+ public void waitForMessageState(InternalDistributedMember sender, Map state) throws InterruptedException {
+ NAKACK2 nakack = (NAKACK2)myChannel.getProtocolStack().findProtocol("NAKACK2");
+ Long seqno = (Long)state.get("JGroups.mcastState");
+ if (nakack != null && seqno != null) {
+ waitForMessageState(nakack, sender, seqno);
+ }
+ }
+
+ /**
+ * wait for the mcast state from the given member to reach the given seqno
+ */
+ protected void waitForMessageState(NAKACK2 nakack, InternalDistributedMember sender, Long seqno)
+ throws InterruptedException {
+ long timeout = services.getConfig().getDistributionConfig().getAckWaitThreshold() * 1000L;
+ long startTime = System.currentTimeMillis();
+ long warnTime = startTime + timeout;
+ long quitTime = warnTime + timeout - 1000L;
+ boolean warned = false;
+
+ JGAddress jgSender = new JGAddress(sender);
+
+ for (;;) {
+ Digest digest = nakack.getDigest(jgSender);
+ if (digest == null) {
+ return;
+ }
+ String received = "none";
+ long[] senderSeqnos = digest.get(jgSender);
+ if (senderSeqnos == null || senderSeqnos[0] >= seqno.longValue()) {
+ break;
+ }
+ long now = System.currentTimeMillis();
+ if (!warned && now >= warnTime) {
+ warned = true;
+ if (senderSeqnos != null) {
+ received = String.valueOf(senderSeqnos[0]);
+ }
+ logger.warn("{} seconds have elapsed while waiting for multicast messages from {}. Received {} but expecting at least {}.",
+ Long.toString((warnTime-startTime)/1000L), sender, received, seqno);
+ }
+ if (now >= quitTime) {
+ throw new GemFireIOException("Multicast operations from " + sender + " did not distribute within " + (now - startTime) + " milliseconds");
+ }
+ Thread.sleep(50);
+ }
+ }
+
+ @Override
+ public Set<InternalDistributedMember> sendUnreliably(DistributionMessage msg) {
+ return send(msg, false);
+ }
+
+ @Override
+ public Set<InternalDistributedMember> send(DistributionMessage msg) {
+ return send(msg, true);
+ }
+
+ public Set<InternalDistributedMember> send(DistributionMessage msg, boolean reliably) {
+
+ // perform the same jgroups messaging as in 8.2's GMSMembershipManager.send() method
+
+ // BUT: when marshalling messages we need to include the version of the product and
+ // localAddress at the beginning of the message. These should be used in the receiver
+ // code to create a versioned input stream, read the sender address, then read the message
+ // and set its sender address
+ DMStats theStats = services.getStatistics();
+ NetView oldView = this.view;
+
+ if (!myChannel.isConnected()) {
+ logger.info("JGroupsMessenger channel is closed - messaging is not possible");
+ throw new DistributedSystemDisconnectedException("Distributed System is shutting down");
+ }
+
+ filterOutgoingMessage(msg);
+
+ // JGroupsMessenger does not support direct-replies, so register
+ // the message's processor if necessary
+ if ((msg instanceof DirectReplyMessage) && msg.isDirectAck() && msg.getProcessorId() <= 0) {
+ ((DirectReplyMessage)msg).registerProcessor();
+ }
+
+ InternalDistributedMember[] destinations = msg.getRecipients();
+ boolean allDestinations = msg.forAll();
+
+ boolean useMcast = false;
+ if (services.getConfig().getTransport().isMcastEnabled()) {
+ if (msg.getMulticast() || allDestinations) {
+ useMcast = services.getManager().isMulticastAllowed();
+ }
+ }
+
+ if (logger.isDebugEnabled() && reliably) {
+ String recips = useMcast? "multicast" : Arrays.toString(msg.getRecipients());
+ logger.debug("sending via JGroups: [{}] recipients: {}", msg, recips);
+ }
+
+ JGAddress local = this.jgAddress;
+
+ if (useMcast) {
+
+ long startSer = theStats.startMsgSerialization();
+ Message jmsg = createJGMessage(msg, local, Version.CURRENT_ORDINAL);
+ theStats.endMsgSerialization(startSer);
+
+ Exception problem = null;
+ try {
+ jmsg.setTransientFlag(TransientFlag.DONT_LOOPBACK);
+ if (!reliably) {
+ jmsg.setFlag(Message.Flag.NO_RELIABILITY);
+ }
+ theStats.incSentBytes(jmsg.getLength());
+ logger.trace("Sending JGroups message: {}", jmsg);
+ myChannel.send(jmsg);
+ }
+ catch (Exception e) {
+ logger.debug("caught unexpected exception", e);
+ Throwable cause = e.getCause();
+ if (cause instanceof ForcedDisconnectException) {
+ problem = (Exception) cause;
+ } else {
+ problem = e;
+ }
+ if (services.getShutdownCause() != null) {
+ Throwable shutdownCause = services.getShutdownCause();
+ // If ForcedDisconnectException occurred then report it as actual
+ // problem.
+ if (shutdownCause instanceof ForcedDisconnectException) {
+ problem = (Exception) shutdownCause;
+ } else {
+ Throwable ne = problem;
+ while (ne.getCause() != null) {
+ ne = ne.getCause();
+ }
+ ne.initCause(services.getShutdownCause());
+ }
+ }
+ final String channelClosed = LocalizedStrings.GroupMembershipService_CHANNEL_CLOSED.toLocalizedString();
+// services.getManager().membershipFailure(channelClosed, problem);
+ throw new DistributedSystemDisconnectedException(channelClosed, problem);
+ }
+ } // useMcast
+ else { // ! useMcast
+ int len = destinations.length;
+ List<GMSMember> calculatedMembers; // explicit list of members
+ int calculatedLen; // == calculatedMembers.len
+ if (len == 1 && destinations[0] == DistributionMessage.ALL_RECIPIENTS) { // send to all
+ // Grab a copy of the current membership
+ NetView v = services.getJoinLeave().getView();
+
+ // Construct the list
+ calculatedLen = v.size();
+ calculatedMembers = new LinkedList<GMSMember>();
+ for (int i = 0; i < calculatedLen; i ++) {
+ InternalDistributedMember m = (InternalDistributedMember)v.get(i);
+ calculatedMembers.add((GMSMember)m.getNetMember());
+ }
+ } // send to all
+ else { // send to explicit list
+ calculatedLen = len;
+ calculatedMembers = new LinkedList<GMSMember>();
+ for (int i = 0; i < calculatedLen; i ++) {
+ calculatedMembers.add((GMSMember)destinations[i].getNetMember());
+ }
+ } // send to explicit list
+ Int2ObjectOpenHashMap<Message> messages = new Int2ObjectOpenHashMap<>();
+ long startSer = theStats.startMsgSerialization();
+ boolean firstMessage = true;
+ for (Iterator<GMSMember> it=calculatedMembers.iterator(); it.hasNext(); ) {
+ GMSMember mbr = it.next();
+ short version = mbr.getVersionOrdinal();
+ if ( !messages.containsKey(version) ) {
+ Message jmsg = createJGMessage(msg, local, version);
+ messages.put(version, jmsg);
+ if (firstMessage) {
+ theStats.incSentBytes(jmsg.getLength());
+ firstMessage = false;
+ }
+ }
+ }
+ theStats.endMsgSerialization(startSer);
+ Collections.shuffle(calculatedMembers);
+ int i=0;
+ for (GMSMember mbr: calculatedMembers) {
+ JGAddress to = new JGAddress(mbr);
+ short version = mbr.getVersionOrdinal();
+ Message jmsg = (Message)messages.get(version);
+ Exception problem = null;
+ try {
+ Message tmp = (i < (calculatedLen-1)) ? jmsg.copy(true) : jmsg;
+ if (!reliably) {
+ jmsg.setFlag(Message.Flag.NO_RELIABILITY);
+ }
+ tmp.setDest(to);
+ tmp.setSrc(this.jgAddress);
+ logger.trace("Unicasting to {}", to);
+ myChannel.send(tmp);
+ }
+ catch (Exception e) {
+ problem = e;
+ }
+ if (problem != null) {
+ Throwable cause = services.getShutdownCause();
+ if (cause != null) {
+ // If ForcedDisconnectException occurred then report it as actual
+ // problem.
+ if (cause instanceof ForcedDisconnectException) {
+ problem = (Exception) cause;
+ } else {
+ Throwable ne = problem;
+ while (ne.getCause() != null) {
+ ne = ne.getCause();
+ }
+ ne.initCause(cause);
+ }
+ }
+ final String channelClosed = LocalizedStrings.GroupMembershipService_CHANNEL_CLOSED.toLocalizedString();
+ // services.getManager().membershipFailure(channelClosed, problem);
+ throw new DistributedSystemDisconnectedException(channelClosed, problem);
+ }
+ } // send individually
+ } // !useMcast
+
+ // The contract is that every destination enumerated in the
+ // message should have received the message. If one left
+ // (i.e., left the view), we signal it here.
+ if (msg.forAll()) {
+ return Collections.emptySet();
+ }
+ Set<InternalDistributedMember> result = new HashSet<InternalDistributedMember>();
+ NetView newView = this.view;
+ if (newView != null && newView != oldView) {
+ for (int i = 0; i < destinations.length; i ++) {
+ InternalDistributedMember d = destinations[i];
+ if (!newView.contains(d)) {
+ logger.debug("messenger: member has left the view: {} view is now {}", d, newView);
+ result.add(d);
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * This is the constructor to use to create a JGroups message holding a GemFire
+ * DistributionMessage. It sets the appropriate flags in the Message and properly
+ * serializes the DistributionMessage for the recipient's product version
+ *
+ * @param gfmsg the DistributionMessage
+ * @param src the sender address
+ * @param version the version of the recipient
+ * @return the new message
+ */
+ Message createJGMessage(DistributionMessage gfmsg, JGAddress src, short version) {
+ if(gfmsg instanceof DirectReplyMessage) {
+ ((DirectReplyMessage) gfmsg).registerProcessor();
+ }
+ Message msg = new Message();
+ msg.setDest(null);
+ msg.setSrc(src);
+ // GemFire uses its own reply processors so there is no need
+ // to maintain message order
+ msg.setFlag(Flag.OOB);
+ // Bundling is mostly only useful if we're doing no-ack work,
+ // which is fairly rare
+ msg.setFlag(Flag.DONT_BUNDLE);
+
+ //log.info("Creating message with payload " + gfmsg);
+ if (gfmsg.getProcessorType() == DistributionManager.HIGH_PRIORITY_EXECUTOR
+ || gfmsg instanceof HighPriorityDistributionMessage) {
+ msg.setFlag(Flag.NO_FC);
+ msg.setFlag(Flag.SKIP_BARRIER);
+ }
+ if (gfmsg instanceof DistributedCacheOperation.CacheOperationMessage) {
+ // we don't want to see our own cache operation messages
+ msg.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK);
+ }
+ try {
+ long start = services.getStatistics().startMsgSerialization();
+ HeapDataOutputStream out_stream =
+ new HeapDataOutputStream(Version.fromOrdinalOrCurrent(version));
+ Version.CURRENT.writeOrdinal(out_stream, true);
+ DataSerializer.writeObject(this.localAddress.getNetMember(), out_stream);
+ DataSerializer.writeObject(gfmsg, out_stream);
+ msg.setBuffer(out_stream.toByteArray());
+ services.getStatistics().endMsgSerialization(start);
+ }
+ catch(IOException | GemFireIOException ex) {
+ logger.warn("Error serializing message", ex);
+ if (ex instanceof GemFireIOException) {
+ throw (GemFireIOException)ex;
+ } else {
+ GemFireIOException ioe = new
+ GemFireIOException("Error serializing message");
+ ioe.initCause(ex);
+ throw ioe;
+ }
+ }
+ return msg;
+ }
+
+
+ /**
+ * deserialize a jgroups payload. If it's a DistributionMessage find
+ * the ID of the sender and establish it as the message's sender
+ */
+ Object readJGMessage(Message jgmsg) {
+ Object result = null;
+
+ int messageLength = jgmsg.getLength();
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("deserializing a message of length "+messageLength);
+ }
+
+ if (messageLength == 0) {
+ // jgroups messages with no payload are used for protocol interchange, such
+ // as STABLE_GOSSIP
+ logger.trace("message length is zero - ignoring");
+ return null;
+ }
+
+ InternalDistributedMember sender = null;
+
+ Exception problem = null;
+ byte[] buf = jgmsg.getRawBuffer();
+ try {
+ long start = services.getStatistics().startMsgDeserialization();
+
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf,
+ jgmsg.getOffset(), jgmsg.getLength()));
+
+ short ordinal = Version.readOrdinal(dis);
+
+ if (ordinal < Version.CURRENT_ORDINAL) {
+ dis = new VersionedDataInputStream(dis, Version.fromOrdinalNoThrow(
+ ordinal, true));
+ }
+
+ GMSMember m = DataSerializer.readObject(dis);
+
+ result = DataSerializer.readObject(dis);
+
+ DistributionMessage dm = (DistributionMessage)result;
+
+ // JoinRequestMessages are sent with an ID that may have been
+ // reused from a previous life by way of auto-reconnect,
+ // so we don't want to find a canonical reference for the
+ // request's sender ID
+ if (dm.getDSFID() == JOIN_REQUEST) {
+ sender = ((JoinRequestMessage)dm).getMemberID();
+ } else {
+ sender = getMemberFromView(m, ordinal);
+ }
+ ((DistributionMessage)result).setSender(sender);
+
+ services.getStatistics().endMsgDeserialization(start);
+ }
+ catch (ClassNotFoundException | IOException | RuntimeException e) {
+ problem = e;
+ }
+ if (problem != null) {
+ logger.error(LocalizedMessage.create(
+ LocalizedStrings.GroupMembershipService_EXCEPTION_DESERIALIZING_MESSAGE_PAYLOAD_0, jgmsg), problem);
+ return null;
+ }
+
+ return result;
+ }
+
+
+ /** look for certain messages that may need to be altered before being sent */
+ void filterOutgoingMessage(DistributionMessage m) {
+ switch (m.getDSFID()) {
+ case JOIN_RESPONSE:
+ JoinResponseMessage jrsp = (JoinResponseMessage)m;
+
+ if (jrsp.getRejectionMessage() == null
+ && services.getConfig().getTransport().isMcastEnabled()) {
+ // get the multicast message digest and pass it with the join response
+ Digest digest = (Digest)this.myChannel.getProtocolStack()
+ .getTopProtocol().down(Event.GET_DIGEST_EVT);
+ HeapDataOutputStream hdos = new HeapDataOutputStream(500, Version.CURRENT);
+ try {
+ digest.writeTo(hdos);
+ } catch (Exception e) {
+ logger.fatal("Unable to serialize JGroups messaging digest", e);
+ }
+ jrsp.setMessengerData(hdos.toByteArray());
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+ void filterIncomingMessage(DistributionMessage m) {
+ switch (m.getDSFID()) {
+ case JOIN_RESPONSE:
+ JoinResponseMessage jrsp = (JoinResponseMessage)m;
+
+ if (jrsp.getRejectionMessage() == null
+ && services.getConfig().getTransport().isMcastEnabled()) {
+ byte[] serializedDigest = jrsp.getMessengerData();
+ ByteArrayInputStream bis = new ByteArrayInputStream(serializedDigest);
+ DataInputStream dis = new DataInputStream(bis);
+ try {
+ Digest digest = new Digest();
+ digest.readFrom(dis);
+ if (digest != null) {
+ logger.trace("installing JGroups message digest {}", digest);
+ this.myChannel.getProtocolStack()
+ .getTopProtocol().down(new Event(Event.SET_DIGEST, digest));
+ jrsp.setMessengerData(null);
+ }
+ } catch (Exception e) {
+ logger.fatal("Unable to read JGroups messaging digest", e);
+ }
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+ @Override
+ public InternalDistributedMember getMemberID() {
+ return localAddress;
+ }
+
+ /**
+ * returns the JGroups configuration string, for testing
+ */
+ public String getJGroupsStackConfig() {
+ return this.jgStackConfig;
+ }
+
+ /**
+ * returns the pinger, for testing
+ */
+ public GMSPingPonger getPingPonger() {
+ return this.pingPonger;
+ }
+
+ /**
+ * for unit testing we need to replace UDP with a fake UDP protocol
+ */
+ public void setJGroupsStackConfigForTesting(String config) {
+ this.jgStackConfig = config;
+ }
+
+ /**
+ * returns the member ID for the given GMSMember object
+ */
+ private InternalDistributedMember getMemberFromView(GMSMember jgId, short version) {
+ NetView v = services.getJoinLeave().getView();
+
+ if (v != null) {
+ for (InternalDistributedMember m: v.getMembers()) {
+ if (((GMSMember)m.getNetMember()).equals(jgId)) {
+ return m;
+ }
+ }
+ }
+ return new InternalDistributedMember(jgId);
+ }
+
+
+ @Override
+ public void emergencyClose() {
+ this.view = null;
+ if (this.myChannel != null) {
+ if ((services.isShutdownDueToForcedDisconnect() && services.isAutoReconnectEnabled()) || services.getManager().isReconnectingDS()) {
+ }
+ else {
+ this.myChannel.disconnect();
+ }
+ }
+ }
+
+ public QuorumChecker getQuorumChecker() {
+ NetView view = this.view;
+ if (view == null) {
+ view = services.getJoinLeave().getView();
+ if (view == null) {
+ view = services.getJoinLeave().getPreviousView();
+ if (view == null) {
+ return null;
+ }
+ }
+ }
+ GMSQuorumChecker qc = new GMSQuorumChecker(
+ view, services.getConfig().getLossThreshold(),
+ this.myChannel);
+ qc.initialize();
+ return qc;
+ }
+ /**
+ * JGroupsReceiver receives incoming JGroups messages and passes them to a handler.
+ * It may be accessed through JChannel.getReceiver().
+ */
+ class JGroupsReceiver extends ReceiverAdapter {
+
+ @Override
+ public void receive(Message jgmsg) {
- if (services.getManager().shutdownInProgress()) {
- return;
- }
-
- if (logger.isTraceEnabled()) {
- logger.trace("JGroupsMessenger received {} headers: {}", jgmsg, jgmsg.getHeaders());
- }
-
- //Respond to ping messages sent from other systems that are in a auto reconnect state
- byte[] contents = jgmsg.getBuffer();
- if (contents == null) {
- return;
- }
- if (pingPonger.isPingMessage(contents)) {
++ long startTime = DistributionStats.getStatTime();
++ try {
++ if (services.getManager().shutdownInProgress()) {
++ return;
++ }
++
++ if (logger.isTraceEnabled()) {
++ logger.trace("JGroupsMessenger received {} headers: {}", jgmsg, jgmsg.getHeaders());
++ }
++
++ //Respond to ping messages sent from other systems that are in a auto reconnect state
++ byte[] contents = jgmsg.getBuffer();
++ if (contents == null) {
++ return;
++ }
++ if (pingPonger.isPingMessage(contents)) {
++ try {
++ pingPonger.sendPongMessage(myChannel, jgAddress, jgmsg.getSrc());
++ }
++ catch (Exception e) {
++ logger.info("Failed sending Pong response to " + jgmsg.getSrc());
++ }
++ return;
++ } else if (pingPonger.isPongMessage(contents)) {
++ pongsReceived.incrementAndGet();
++ return;
++ }
++
++ Object o = readJGMessage(jgmsg);
++ if (o == null) {
++ return;
++ }
++
++ DistributionMessage msg = (DistributionMessage)o;
++ assert msg.getSender() != null;
++
++ // admin-only VMs don't have caches, so we ignore cache operations
++ // multicast to them, avoiding deserialization cost and classpath
++ // problems
++ if ( (services.getConfig().getTransport().getVmKind() == DistributionManager.ADMIN_ONLY_DM_TYPE)
++ && (msg instanceof DistributedCacheOperation.CacheOperationMessage)) {
++ return;
++ }
++
++ msg.resetTimestamp();
++ msg.setBytesRead(jgmsg.getLength());
++
+ try {
- pingPonger.sendPongMessage(myChannel, jgAddress, jgmsg.getSrc());
++ logger.trace("JGroupsMessenger dispatching {} from {}", msg, msg.getSender());
++ filterIncomingMessage(msg);
++ getMessageHandler(msg).processMessage(msg);
+ }
- catch (Exception e) {
- logger.info("Failed sending Pong response to " + jgmsg.getSrc());
++ catch (MemberShunnedException e) {
++ // message from non-member - ignore
+ }
- return;
- } else if (pingPonger.isPongMessage(contents)) {
- pongsReceived.incrementAndGet();
- return;
- }
-
- Object o = readJGMessage(jgmsg);
- if (o == null) {
- return;
- }
-
- DistributionMessage msg = (DistributionMessage)o;
- assert msg.getSender() != null;
-
- // admin-only VMs don't have caches, so we ignore cache operations
- // multicast to them, avoiding deserialization cost and classpath
- // problems
- if ( (services.getConfig().getTransport().getVmKind() == DistributionManager.ADMIN_ONLY_DM_TYPE)
- && (msg instanceof DistributedCacheOperation.CacheOperationMessage)) {
- return;
- }
-
- msg.resetTimestamp();
- msg.setBytesRead(jgmsg.getLength());
-
- try {
- logger.trace("JGroupsMessenger dispatching {} from {}", msg, msg.getSender());
- filterIncomingMessage(msg);
- getMessageHandler(msg).processMessage(msg);
- }
- catch (MemberShunnedException e) {
- // message from non-member - ignore
++ }finally {
++ long delta = DistributionStats.getStatTime() - startTime ;
++ JGroupsMessenger.this.services.getStatistics().incUDPDispatchRequestTime(delta);
+ }
+ }
+
+ /**
+ * returns the handler that should process the given message.
+ * The default handler is the membership manager
+ * @param msg
+ * @return
+ */
+ private MessageHandler getMessageHandler(DistributionMessage msg) {
+ Class<?> msgClazz = msg.getClass();
+ MessageHandler h = handlers.get(msgClazz);
+ if (h == null) {
+ for (Class<?> clazz: handlers.keySet()) {
+ if (clazz.isAssignableFrom(msgClazz)) {
+ h = handlers.get(clazz);
+ handlers.put(msg.getClass(), h);
+ break;
+ }
+ }
+ }
+ if (h == null) {
+ h = (MessageHandler)services.getManager();
+ }
+ return h;
+ }
+ }
+
+
+
+}